#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
#          http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description  : DbClusterInfo.py is a utility to get cluster information
#############################################################################
# Standard library imports
import argparse
import binascii
import copy
import datetime
import getopt
import getpass
import ipaddress
import json
import logging.handlers as _handlers
import os
import pwd
import re
import shutil
import socket
import stat
import struct
import subprocess
import sys
import tempfile
import threading
import time
import types
import xml.etree.ElementTree as ET
import xml.etree.cElementTree as ETree

sys.path.append(os.path.split(os.path.realpath(__file__))[0] + "/../../")

# Network constants
NET_IPV6 = "ipv6"
NET_IPV4 = "ipv4"
ADDRESS_FAMILY_INDEX = 4
IP_ADDRESS_INDEX = 0

# Environment constants
ENV_CLUSTERCONFIG = "CLUSTERCONFIGFILE"

# Command path constants
CMD_PATH = ['/bin', '/usr/local/bin', '/usr/bin', '/sbin', '/usr/sbin']
CMD_CACHE = {}

# String constants
BLANK_SPACE = " "
COLON = ":"

# Instance role constants
INSTANCE_ROLE_UNDEFINED = -1
INSTANCE_ROLE_CMSERVER = 0
INSTANCE_ROLE_GTM = 1
INSTANCE_ROLE_ETCD = 2
INSTANCE_ROLE_COODINATOR = 3
INSTANCE_ROLE_DATANODE = 4
INSTANCE_ROLE_CMAGENT = 5

# ID number constants
BASE_ID_CMSERVER = 1
BASE_ID_GTM = 1001
BASE_ID_CMAGENT = 10001
BASE_ID_DUMMYDATANODE = 3001
BASE_ID_COORDINATOR = 5001
BASE_ID_DATANODE = 6001
BASE_ID_ETCD = 7001

# Directory permissions
DIRECTORY_PERMISSION = 0o750

# Primary/standby instance ID constants
OLD_LAST_PRIMARYSTANDBY_BASEID_NUM = 7000
NEW_FIRST_PRIMARYSTANDBY_BASEID_NUM = 40000

# Master instance default ports
MASTER_BASEPORT_CMS = 5000
MASTER_BASEPORT_GTM = 6000
MASTER_BASEPORT_CMAGENT = 0  # cm agent has no port, just occupancy index 5
MASTER_BASEPORT_COO = 8000
MASTER_BASEPORT_DATA = 40000
MASTER_BASEPORT_ETCD = 2379
# Standby instance default ports
STANDBY_BASEPORT_CMS = 5500
STANDBY_BASEPORT_GTM = 6500
STANDBY_BASEPORT_CMAGENT = 0  # cm agent has no port, just occupancy index 5
STANDBY_BASEPORT_COO = 8500
STANDBY_BASEPORT_DATA = 45000
STANDBY_BASEPORT_ETCD = 2380

# Instance type constants (only for CN/DN)
INSTANCE_TYPE_UNDEFINED = -1
MASTER_INSTANCE = 0
STANDBY_INSTANCE = 1
DUMMY_STANDBY_INSTANCE = 2
CASCADE_STANDBY = 3

DICT_INSTANCE = {
    MASTER_INSTANCE: "primary",
    STANDBY_INSTANCE: "standby",
    CASCADE_STANDBY: "cascade_standby"
}

# Instance number constants
MIRROR_COUNT_REPLICATION_MAX = 9  # max number of replication for CLUSTER_TYPE_SINGLE_PRIMARY_MULTI_STANDBY
AZPRIORITY_MAX = 10  # max number of azPriority for CLUSTER_TYPE_SINGLE_PRIMARY_MULTI_STANDBY
AZPRIORITY_MIN = 1   # min number of azPriority for CLUSTER_TYPE_SINGLE_PRIMARY_MULTI_STANDBY
PORT_STEP_SIZE = 20  # DB port set step size for CLUSTER_TYPE_SINGLE_PRIMARY_MULTI_STANDBY

MIRROR_ID_AGENT = -3

# Cluster type constants
CLUSTER_TYPE_SINGLE = "single"
CLUSTER_TYPE_SINGLE_PRIMARY_MULTI_STANDBY = "single-primary-multi-standby"
CLUSTER_TYPE_SINGLE_INST = "single-inst"

# Default config version constants (used by gs_upgrade)
BIN_CONFIG_VERSION = 2
BIN_CONFIG_VERSION_SINGLE = 101
BIN_CONFIG_VERSION_SINGLE_PRIMARY_MULTI_STANDBY = 201
BIN_CONFIG_VERSION_SINGLE_INST = 301

# Other constants
PAGE_SIZE = 8192
MAX_IP_NUM = 3
CONFIG_IP_NUM = 1

# Length constants
NODE_ID_LEN = 2
INSTANCE_ID_LEN = 8
SPACE_LEN = 1
STATE_LEN = 17
SEPERATOR_LEN = 1
IP_LEN = 16
PORT_LEN = 10

# Directory permissions
KEY_DIRECTORY_MODE = 700

# Network type constants
g_networkType = 0  # The default network type is single plane

# Global variables
global_cls_query_rst = {}  # global param to cache gr_om query instance result
xmlRootNode = None

# Log constants
MAXLOGFILESIZE = 16 * 1024 * 1024
LOG_DEBUG = 1
LOG_INFO = 2
LOG_WARNING = 2.1
LOG_ERROR = 3
LOG_FATAL = 4

class CMLog:
    """
    Class to handle log file operations.
    
    This class provides functionality for creating, managing, and writing to log files
    with automatic rotation and formatting capabilities.
    """

    def __init__(self, logPath, module, prefix, suffix=".log", expectLevel=LOG_DEBUG, traceId=None):
        """
        Initialize the CMLog instance.
        
        Args:
            logPath (str): Path to the log directory
            module (str): Module name for logging
            prefix (str): Prefix for log file names
            suffix (str): Suffix for log file names (default: ".log")
            expectLevel (int): Expected log level (default: LOG_DEBUG)
            traceId (str): Trace ID for logging (default: None)
        """
        self.logFile = ""
        self.expectLevel = expectLevel
        self.moduleName = module
        self.fp = None
        self.size = 0
        self.suffix = suffix
        self.prefix = prefix
        self.logPath = logPath
        self.pid = os.getpid()
        self.step = 0
        self.lock = threading.Lock()
        self.tmpFile = None
        self.ignoreErr = False
        self.traceId = traceId

        try:
            if not os.path.isdir(logPath):
                print(ErrorCode.GAUSS_502["GAUSS_50211"] % logPath)
                sys.exit(1)
            # check log path
            if not os.path.exists(logPath):
                try:
                    os.makedirs(logPath, 0o700)
                except Exception as e:
                    raise Exception(ErrorCode.GAUSS_502["GAUSS_50208"] %
                                    logPath + " Error:\n%s" % str(e))
            # create new log file
            self.__openLogFile()
        except Exception as ex:
            print(str(ex))
            sys.exit(1)

    def __checkLink(self):
        """
        function: check log file is link
        input : NA
        output: list of
        """
        if os.path.islink(self.logFile):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50206"] % self.logFile)

    def __checkLogFileExist(self):
        """
        check whether log file exists, if exist, get log file name
        log file name format: 
            prefix-YYYY-mm-DD_HHMMSSsuffix = cm_install-YYYY-mm-DD_HHMMSS.log
        """
        logFileList = "%s/logFileList_%s.dat" % (self.logPath, self.pid)
        cmd = "ls %s | grep '^%s-.*%s$' > %s" % (
            self.logPath, self.prefix, self.suffix, logFileList)
        (status, output) = subprocess.getstatusoutput(cmd)
        if status != 0:
            if os.path.exists(logFileList):
                os.remove(logFileList)
            return False
        with open(logFileList, "r") as fp:
            filenameList = []
            while True:
                # get real file name
                filename = (fp.readline()).strip()
                if not filename:
                    break
                existedResList = filename.split(".")
                if len(existedResList) > 2:
                    continue
                (existedPrefix, existedSuffix) = \
                    os.path.splitext(filename)
                if existedSuffix != self.suffix:
                    continue
                if len(filename) != len(self.prefix) + \
                    len(self.suffix) + 18:
                    continue
                timeStamp = existedPrefix[-17:]
                # check log file name
                if self.__isValidDate(timeStamp):
                    filenameList.append(filename)
        # cleanup logFileList
        if os.path.exists(logFileList):
            os.remove(logFileList)

        if len(filenameList) == 0:
            return False
        # get logFile
        fileName = max(filenameList)
        self.logFile = os.path.join(self.logPath, fileName)
        self.__checkLink()
        return True

    def __openLogFile(self):
        """
        function: open log file
        input : NA
        output: NA
        """
        try:
            if self.__checkLogFileExist():
                self.fp = open(self.logFile, "a")
                return
            # get current time
            currentTime = time.strftime("%Y-%m-%d_%H%M%S")
            # init log file
            self.logFile = os.path.join(self.logPath, self.prefix + "-" + currentTime + self.suffix)
            # Re-create the log file to add a retry 3 times mechanism,
            # in order to call concurrently between multiple processes
            retryTimes = 3
            count = 0
            while (True):
                (status, output) = self.__createLogFile()
                if status == 0:
                    break
                count = count + 1
                time.sleep(1)
                if (count > retryTimes):
                    raise Exception(output)
            # open log file
            self.__checkLink()
            self.fp = open(self.logFile, "a")
        except Exception as e:
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50206"]
                            % self.logFile + " Error:\n%s" % str(e))

    def __createLogFile(self):
        """
        function: create log file
        input : NA
        output: (status, output)
        """
        try:
            if (not os.path.exists(self.logFile)):
                os.mknod(self.logFile)
            return (0, "")
        except Exception as e:
            return (1, str(e))

    def __isValidDate(self, datastr):
        """
        function: Judge if date valid
        input : datastr
        output: bool
        """
        try:
            time.strptime(datastr, "%Y-%m-%d_%H%M%S")
            return True
        except Exception as ex:
            return False

    def closeLog(self):
        """
        function: Function to close log file
        input : NA
        output: NA
        """
        try:
            if self.fp:
                self.fp.flush()
                self.fp.close()
                self.fp = None
        except Exception as ex:
            if self.fp:
                self.fp.close()
            raise Exception(str(ex))

    # print the flow message to console window and log file
    # AddInfo: constant represent step constant, addStep represent step
    # plus, None represent no step
    def log(self, msg, stepFlag=""):
        """
        function:print the flow message to console window and log file
        input:   msg,stepFlag
        control: when stepFlag="", the OM background log does not display
        step information.
                 when stepFlag="addStep", the OM background log step will
                 add 1.
                 when stepFlag="constant", the OM background log step
                 defaults to the current step.
        output:  NA
        """
        if (LOG_INFO >= self.expectLevel):
            print(msg)
            self.__writeLog("LOG", msg, stepFlag)

    # print the flow message to log file only
    def debug(self, msg, stepFlag=""):
        """
        function:print the flow message to log file only
        input:   msg,stepFlag
        control: when stepFlag="", the OM background log does not display
        step information.
                 when stepFlag="addStep", the OM background log step will
                 add 1.
                 when stepFlag="constant", the OM background log step
                 defaults to the current step.
        output:  NA
        """
        if (LOG_DEBUG >= self.expectLevel):
            self.__writeLog("DEBUG", msg, stepFlag)

    def warn(self, msg, stepFlag=""):
        """
        function:print the flow message to log file only
        input:   msg,stepFlag
        control: when stepFlag="", the OM background log does not display
        step information.
                 when stepFlag="addStep", the OM background log step will
                 add 1.
                 when stepFlag="constant", the OM background log step
                 defaults to the current step.
        output:  NA
        """
        if (LOG_WARNING >= self.expectLevel):
            print(msg)
            self.__writeLog("WARNING", msg, stepFlag)

    # print the error message to console window and log file
    def error(self, msg):
        """
        function: print the error message to console window and log file
        input : msg
        output: NA
        """
        if (LOG_ERROR >= self.expectLevel):
            print(msg)
            self.__writeLog("ERROR", msg)

    # print the error message to console window and log file,then exit
    def logExit(self, msg):
        """
        function: print the error message to console window and log file,
        then exit
        input : msg
        output: NA
        """
        if (LOG_FATAL >= self.expectLevel):
            print(msg)
            try:
                self.__writeLog("ERROR", msg)
            except Exception as ex:
                print(str(ex))
        self.closeLog()
        sys.exit(1)

    def Step(self, stepFlag):
        """
        function: return Step number info
        input: add
        output: step number
        """
        if (stepFlag == "constant"):
            return self.step
        else:
            self.step = self.step + 1
            return self.step

    def __getLogFileLine(self):
        f = sys._getframe().f_back.f_back.f_back
        return "%s(%s:%s)" % (os.path.basename(f.f_code.co_filename), f.f_code.co_name,
                              str(f.f_lineno))

    def __writeLog(self, level, msg, stepFlag=""):
        """
        function: Write log to file
        input: level, msg, stepFlag
        output: NA
        """
        if self.fp is None:
            return

        try:
            self.lock.acquire()
            # if the log file does not exits, create it
            if (not os.path.exists(self.logFile)):
                self.__openLogFile()
            else:
                logPer = oct(os.stat(self.logFile).st_mode)[-3:]
                self.__checkLink()
                if not logPer == "600":
                    os.chmod(self.logFile, 0o600)
            # check if need switch to an new log file
            self.size = os.path.getsize(self.logFile)
            if self.size >= MAXLOGFILESIZE and os.getuid() != 0:
                self.closeLog()
                self.__openLogFile()

            replaceReg = re.compile(r'-W[ ]*[^ ]*[ ]*')
            msg = replaceReg.sub('-W *** ', str(msg))

            if msg.find("gs_redis") >= 0:
                replaceReg = re.compile(r'-A[ ]*[^ ]*[ ]*')
                msg = replaceReg.sub('-A *** ', str(msg))

            strTime = datetime.datetime.now()
            fileLine = self.__getLogFileLine()
            if stepFlag == "":
                if self.traceId:
                    print("[%s][%s][%d][%s][%s]:%s"
                          % (self.traceId, strTime, self.pid, self.moduleName,
                             level, msg), file=self.fp)
                else:
                    print("[%s][%d][%s][%s]:%s" % (
                        strTime, self.pid, self.moduleName, level, msg),
                        file=self.fp)
            else:
                stepnum = self.Step(stepFlag)
                print("[%s][%d][%s][%s][%s][Step%d]:%s" % (
                    strTime, self.pid, fileLine, self.moduleName, level, stepnum, msg),
                      file=self.fp)
            self.fp.flush()
            self.lock.release()
        except Exception as ex:
            self.lock.release()
            if self.ignoreErr:
                return
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50205"]
                            % (("log file %s") % self.logFile) +
                            " Error:\n%s" % str(ex))

    @staticmethod
    def exitWithError(msg, status=1):
        """
        function: Exit with error message
        input: msg, status=1
        output: NA
        """
        sys.stderr.write("%s\n" % msg)
        sys.exit(status)

    @staticmethod
    def printMessage(msg):
        """
        function: Print the String message
        input: msg
        output: NA
        """
        sys.stdout.write("%s\n" % msg)

class peerInstanceInfo():
    """
    Peer instance information
    """

    def __init__(self):
        self.peerDataPath = ""
        self.peerHAIPs = []
        self.peerHAPort = 0
        self.peerRole = 0
        self.peer2DataPath = ""
        self.peer2HAIPs = []
        self.peer2HAPort = 0
        self.peer2Role = 0

    def __str__(self):
        """
        Construct a printable string representation of a instanceInfo
        """
        ret = "peerDataPath=%s,peerHAPort=%d,peerRole=%d" % (
            self.peerDataPath, self.peerHAPort, self.peerRole)
        if self.peer2DataPath:
            ret += ",peer2DataPath=%s" % self.peer2DataPath
        if self.peer2HAPort:
            ret += ",peer2HAPort=%d" % self.peer2HAPort
        if self.peer2Role:
            ret += ",peer2Role=%d" % self.peer2Role
        return ret

class instanceInfo():
    """
    Instance information
    """

    def __init__(self, instId=0, mirrorId=0):
        """
        Constructor
        """
        # instance id
        self.instanceId = instId
        self.mirrorId = mirrorId
        # host name
        self.hostname = ""
        # listen ip
        self.listenIps = []
        # ha ip
        self.haIps = []
        # float ip
        self.float_ips = []
        # port
        self.port = 0
        # It's pool port for coordinator, and ha port for other instance
        self.haPort = 0
        # data directory
        self.datadir = ""
        # xlog directory
        self.xlogdir = ""
        # ssd data directory
        self.ssdDir = ""
        # instance type
        self.instanceType = INSTANCE_TYPE_UNDEFINED
        # instance role
        self.instanceRole = INSTANCE_ROLE_UNDEFINED
        # instance rack info
        self.rack = ""
        # oltp zpaxos sub instance type
        self.subInstanceType = INSTANCE_ROLE_UNDEFINED

        self.level = 1
        # we use port and haPort to save peerPort/clientPort for etcd
        # datanode: use haPort to save replport
        # repl port
        self.replport = 0
        # sctp port
        self.sctpPort = 0
        # control port
        self.controlPort = 0
        # az name
        self.azName = ""
        self.azPriority = 0
        self.clusterName = ""
        # peer port etcd
        self.peerPort = 0
        # client port etcd
        self.clientPort = 0
        # instance name
        self.name = ""
        # DB state Normal or other, use to save dynamic info
        self.state = ""
        # get staticConnections from database,use to save dynamic info
        self.staticConnections = ""
        # DB role such as Primary, Standby
        self.localRole = ""
        self.peerInstanceInfos = []
        self.syncNum = -1
        self.syncNumFirst = ""
        self.cascadeRole = "off"
        # dcf_data_path
        self.dcf_data_path = ""

    def __cmp__(self, target):
        """
        Type compare
        """
        if (type(self) != type(target)):
            return 1
        if (not isinstance(target, instanceInfo)):
            return 1
        if (not hasattr(target, "instanceId")):
            return 1
        else:
            return self.instanceId - target.instanceId

    def __str__(self):
        """
        Construct a printable string representation of a instanceInfo
        """
        ret = "InstanceId=%s,MirrorId=%s,Host=%s,Port=%s,DataDir=%s," \
              "XlogDir=%s,SsdDir=%s,InstanceType=%s,Role=%s,ListenIps=%s," \
              "HaIps=%s" % (
                  self.instanceId, self.mirrorId, self.hostname, self.port,
                  self.datadir, self.xlogdir, self.ssdDir, self.instanceType,
                  self.instanceRole, self.listenIps, self.haIps)
        if self.rack:
            ret += ",rack=%s" % self.rack
        if self.replport:
            ret += ",replport=%s" % self.replport
        if self.sctpPort:
            ret += ",sctpPort=%s" % self.sctpPort
        if self.controlPort:
            ret += ",controlPort=%s" % self.controlPort
        if self.azName:
            ret += ",azName=%s" % self.azName
        if hasattr(self, 'azPriority') and self.azPriority > 0:
            ret += ",azPriority=%s" % self.azPriority
        if self.clusterName:
            ret += ",clusterName=%s" % self.clusterName
        if self.peerPort:
            ret += ",peerPort=%s" % self.peerPort
        if self.clientPort:
            ret += ",clientPort=%s" % self.clientPort
        if self.name:
            ret += ",name=%s" % self.name
        return ret

class dbNodeInfo():
    """
    Instance info on a node
    """

    def __init__(self, nodeId=0, name=""):
        """
        Constructor
        """
        # node id
        self.id = nodeId
        # node name
        self.name = name
        self.backIps = []
        self.virtualIp = []
        self.sshIps = []
        # instance number
        self.cmsNum = 0
        self.cooNum = 0
        self.dataNum = 0
        self.gtmNum = 0
        self.etcdNum = 0
        # cm_servers instance
        self.cmservers = []
        # cn instance
        self.coordinators = []
        # DB instance
        self.datanodes = []
        # gtm instance
        self.gtms = []
        # cm_agent instance
        self.cmagents = []
        # etcd instance
        self.etcds = []
        # cm_server/cm_agent data directory
        self.cmDataDir = ""
        self.dummyStandbyBasePort = 0
        self.masterBasePorts = [MASTER_BASEPORT_CMS, MASTER_BASEPORT_GTM,
                                MASTER_BASEPORT_COO,
                                MASTER_BASEPORT_DATA, MASTER_BASEPORT_ETCD,
                                MASTER_BASEPORT_CMAGENT]
        self.standbyBasePorts = [STANDBY_BASEPORT_CMS, STANDBY_BASEPORT_GTM,
                                 STANDBY_BASEPORT_COO,
                                 STANDBY_BASEPORT_DATA, STANDBY_BASEPORT_ETCD,
                                 STANDBY_BASEPORT_CMAGENT]
        # azName
        self.azName = ""
        self.azPriority = 1
        self.standbyDnNum = 0
        self.dummyStandbyDnNum = 0
        self.cascadeRole = "off"
        self.ssh_port = 0
        # gr
        self.grIp1 = ""
        self.listen_addr = ""

    def __cmp__(self, target):
        """
        Type compare
        """
        if (type(self) != type(target)):
            return 1
        if (not isinstance(target, dbNodeInfo)):
            return 1
        if (not hasattr(target, "id")):
            return 1
        else:
            return self.id - target.id

    def __str__(self):
        """
        function : Construct a printable string representation of a dbNodeInfo
        input : NA
        output : String
        """
        retStr = "HostName=%s,backIps=%s" % (self.name, self.backIps)
        # cm_server instance information
        for cmsInst in self.cmservers:
            retStr += "\n%s" % str(cmsInst)
        # cm_agent instance information
        for cmaInst in self.cmagents:
            retStr += "\n%s" % str(cmaInst)
        # gtm instance information
        for gtmInst in self.gtms:
            retStr += "\n%s" % str(gtmInst)
        # cn instance information
        for cooInst in self.coordinators:
            retStr += "\n%s" % str(cooInst)
        # DB instance information
        for dataInst in self.datanodes:
            retStr += "\n%s" % str(dataInst)
        # etcd instance information
        for dataInst in self.etcds:
            retStr += "\n%s" % str(dataInst)

        return retStr

    def setDnDetailNum(self):
        self.dataNum = self.getDnNum(MASTER_INSTANCE)
        self.standbyDnNum = self.getDnNum(STANDBY_INSTANCE)
        self.dummyStandbyDnNum = self.getDnNum(DUMMY_STANDBY_INSTANCE)

    def getDnNum(self, dntype):
        """
        function: get DB num
        input: dntype
        output:dn num
        """
        count = 0
        for dnInst in self.datanodes:
            if (dnInst.instanceType == dntype):
                count += 1
        return count

    def appendInstance(self, instId, mirrorId, instRole, instanceType,
                       listenIps=None, haIps=None, datadir="", ssddir="", level=1,
                       xlogdir="", syncNum=-1, syncNumFirst="", dcf_data="", float_ips=None):
        """
        function : Classify the instance of cmserver/gtm
        input : int,int,String,String
        output : NA
        """
        if not self.__checkDataDir(datadir, instRole):
            raise Exception(ErrorCode.GAUSS_516["GAUSS_51638"] % \
                            self.name + " Data directory[%s] is "
                                        "conflicting." % datadir)

        dbInst = instanceInfo(instId, mirrorId)
        dbInst.hostname = self.name
        dbInst.datadir = os.path.realpath(datadir)

        if (instRole == INSTANCE_ROLE_DATANODE):
            dbInst.xlogdir = xlogdir
        else:
            dbInst.xlogdir = ""
        dbInst.instanceType = instanceType
        dbInst.instanceRole = instRole
        if (listenIps is not None):
            if (len(listenIps) == 0):
                dbInst.listenIps = self.backIps[:]
            else:
                dbInst.listenIps = listenIps[:]

        if float_ips is not None:
            if len(float_ips) != 0:
                dbInst.float_ips = float_ips

        if (haIps is not None):
            if (len(haIps) == 0):
                dbInst.haIps = self.backIps[:]
            else:
                dbInst.haIps = haIps[:]
        # cm_server
        if (instRole == INSTANCE_ROLE_CMSERVER):
            dbInst.datadir = os.path.join(self.cmDataDir, "cm_server")
            dbInst.port = self.__assignNewInstancePort(self.cmservers,
                                                       instRole, instanceType)
            dbInst.level = level
            dbInst.haPort = dbInst.port + 1
            self.cmservers.append(dbInst)
        # gtm
        elif (instRole == INSTANCE_ROLE_GTM):
            dbInst.port = self.__assignNewInstancePort(self.gtms, instRole,
                                                       instanceType)
            dbInst.haPort = dbInst.port + 1
            self.gtms.append(dbInst)
        # dn
        elif (instRole == INSTANCE_ROLE_DATANODE):
            dbInst.port = self.__assignNewInstancePort(self.datanodes,
                                                       instRole, instanceType)
            dbInst.haPort = dbInst.port + 1
            dbInst.ssdDir = ssddir
            dbInst.syncNum = syncNum
            dbInst.syncNumFirst = syncNumFirst
            dbInst.dcf_data_path = dcf_data
            self.datanodes.append(dbInst)
        # cm_agent
        elif (instRole == INSTANCE_ROLE_CMAGENT):
            dbInst.datadir = os.path.join(self.cmDataDir, "cm_agent")
            self.cmagents.append(dbInst)
        # etcd
        elif (instRole == INSTANCE_ROLE_ETCD):
            dbInst.port = self.__assignNewInstancePort(self.etcds, instRole,
                                                       instanceType)
            dbInst.haPort = self.__assignNewInstancePort(self.etcds, instRole,
                                                         STANDBY_INSTANCE)
            self.etcds.append(dbInst)

    def __checkDataDir(self, datadir, instRole):
        """
        function : Check whether the instance path is the same as with the
        parameter of datadir
        input : String,String
        output : boolean
        """
        if (datadir == ""):
            return (
                    instRole == INSTANCE_ROLE_CMSERVER or instRole ==
                    INSTANCE_ROLE_CMAGENT)
        checkPathVaild(datadir)
        # cm_server
        for cmsInst in self.cmservers:
            if (cmsInst.datadir == datadir):
                return False
        # cn
        for cooInst in self.coordinators:
            if (cooInst.datadir == datadir):
                return False
        # dn
        for dataInst in self.datanodes:
            if (dataInst.datadir == datadir):
                return False
        # gtm
        for gtmInst in self.gtms:
            if (gtmInst.datadir == datadir):
                return False
        # etcd
        for etcd in self.etcds:
            if (etcd.datadir == datadir):
                return False
        # cm_agent
        for cmaInst in self.cmagents:
            if (cmaInst.datadir == datadir):
                return False

        return True

    def __assignNewInstancePort(self, instList, instRole, instanceType):
        """
        function : Assign a new port for the instance
        input : [],String ,String
        output : int 
        """
        port = 0
        # master instance
        if instanceType == MASTER_INSTANCE:
            port = self.masterBasePorts[instRole]
        # standby instance
        elif instanceType == STANDBY_INSTANCE:
            port = self.standbyBasePorts[instRole]
        # DB dummy standby instance
        elif instanceType == DUMMY_STANDBY_INSTANCE:
            port = self.dummyStandbyBasePort
        # cn and cm_agent instance
        elif instanceType == INSTANCE_TYPE_UNDEFINED:
            port = self.masterBasePorts[instRole]
            return port
        for inst in instList:
            if (inst.instanceType == instanceType):
                port += 2

        return port
class dbClusterInfo():
    """
    Cluster info
    """

    def __init__(self, checkSctpPort=False):
        """
        Constructor
        """
        self.name = ""
        self.appPath = ""
        self.logPath = ""
        self.xmlFile = ""
        self.dbNodes = []
        self.newNodes = []
        self.cmsFloatIp = ""
        self.__newInstanceId = [BASE_ID_CMSERVER, BASE_ID_GTM, BASE_ID_ETCD,
                                BASE_ID_COORDINATOR, BASE_ID_DATANODE,
                                BASE_ID_CMAGENT]
        self.__newDummyStandbyId = BASE_ID_DUMMYDATANODE
        self.__newMirrorId = 0
        self.clusterRings = []
        self.clusterType = CLUSTER_TYPE_SINGLE_INST
        self.checkSctpPort = checkSctpPort
        self.clusterName = ""
        self.toolPath = ""
        self.agentPath = ""
        self.agentLogPath = ""
        self.tmpPath = ""
        self.managerPath = ""
        self.replicaNum = 0
        self.float_ips = {}
        self.ips_type = []
        self.cluster_back_ip1s = []
        self.node_num = 0

        # add azName
        self.azName = ""
        self.cascadeRole = "off"

        self.version = 0
        self.installTime = 0
        self.localNodeId = 0
        self.nodeCount = 0
        # cluster properties
        self.replicationCount = 0
        self.quorumMode = ""
        self.gtmcount = 0
        self.etcdcount = 0
        self.cmscount = 0
        self.__newGroupId = 0
        self.cncount = 0
        self.masterDnCount = 0
        self.standbyDnCount = 0
        self.dummyStandbyDnCount = 0
        self.cm_state_list = list()
        # add for dcf
        self.enable_dcf = ""
        self.dcf_config = ""
        # oGRecorder
        self.gr_nodes_list = ""
        self.grPath = ""

    def __str__(self):
        """
        function : Construct a printable string representation of a
        dbClusterInfo
        input : NA
        output : String
        """
        retStr = "ClusterName=%s,AppPath=%s,LogPath=%s,ClusterType=%s" % \
                 (self.name, self.appPath, self.logPath, self.clusterType)

        for dbNode in self.dbNodes:
            retStr += "\n%s" % str(dbNode)

        return retStr

    def initLogger(self, mode):
        logPath = os.path.join(self.logPath, "om", "gr_om")
        if not os.path.exists(logPath):
            os.makedirs(logPath)
        self.logger = CMLog(logPath, "gr_om", mode)

    def check_conf_cm_state(self):
        """
        Save CM instance state
        """
        if not self.cm_state_list:
            return True
        state_result = self.cm_state_list[0]
        for state in self.cm_state_list[1:]:
            state_result ^= state
            if state_result:
                return False
        return True

    def __getDnRole(self, instanceType):
        """
        function : Get DnRole by instanceType
        input : Int
        output : String
        """
        if instanceType == MASTER_INSTANCE:
            return "P"
        elif instanceType == STANDBY_INSTANCE:
            return "S"
        elif instanceType == CASCADE_STANDBY:
            return "C"
        elif instanceType == DUMMY_STANDBY_INSTANCE:
            return "R"
        else:
            return ""

    def __getDnInstanceNum(self):
        dnInsNum = 0
        for dbNode in self.dbNodes:
            dnInsNum += len(dbNode.datanodes)
        return dnInsNum

    def __fprintContent(self, content, fileName):
        if fileName != "":
            createFileInSafeMode(fileName)
            with open(fileName, "a") as fp:
                fp.write(content)
                fp.flush()
        sys.stdout.write(content)

    def __checkOsUser(self, user):
        """
        function : Check os user
        input : String
        output : NA
        """
        try:
            user = pwd.getpwnam(user).pw_gid
        except Exception as e:
            raise Exception(ErrorCode.GAUSS_503["GAUSS_50300"] % user)

    def __getStaticConfigFilePath(self, user, ignore_err=False):
        """
        function : get the path of static configuration file. 
        input : String
        output : String
        """
        gaussHome = self.__getEnvironmentParameterValue("GAUSSHOME", user)
        if (gaussHome == ""):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                            ("installation path of designated user [%s]" %
                             user))

        checkPathVaild(gaussHome)
        # if under upgrade, and use chose strategy, we may get a wrong path,
        # so we will use the realpath of gausshome
        commitid = VersionInfo.getCommitid()
        appPath = gaussHome + "_" + commitid
        staticConfigFile = "%s/bin/cluster_static_config" % appPath
        staticConfigBak = "%s/bin/cluster_static_config_bak" % appPath
        staticConfig = "%s/bin/cluster_static_config" % os.path.realpath(
            gaussHome)
        if os.path.exists(staticConfig):
            return staticConfig
        elif (os.path.exists(staticConfigFile)):
            return staticConfigFile
        elif (os.path.exists(staticConfigBak)):
            return staticConfigBak
        elif ignore_err:
            return ''
        else:
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                            ("static configuration file [%s] of "
                             "designated user [%s]" % (staticConfig, user)))

    def get_staic_conf_path(self, user, ignore_err=False):
        return self.__getStaticConfigFilePath(user=user, ignore_err=ignore_err)


    def __getEnvironmentParameterValue(self, environmentParameterName, user):
        """
        function :Get the environment parameter. 
        !!!!Do not call this function in preinstall.py script.
        because we determine if we are using env separate version by the
        value of MPPDB_ENV_SEPARATE_PATH
        input : String,String 
        output : String
        """
        # get mpprc file
        mpprcFile = getEnvironmentParameterValue('MPPDB_ENV_SEPARATE_PATH', user)
        if mpprcFile is not None and mpprcFile != "":
            mpprcFile = mpprcFile.replace("\\", "\\\\").replace('"', '\\"\\"')
            checkPathVaild(mpprcFile)
            userProfile = mpprcFile
        else:
            userProfile = "~/.bashrc"
        # build shell command
        if (os.getuid() == 0):
            cmd = "su - %s -c 'source %s;echo $%s' 2>/dev/null" % (
                user, userProfile, environmentParameterName)
        else:
            cmd = "source %s;echo $%s 2>/dev/null" % (userProfile,
                                                      environmentParameterName)
        (status, output) = subprocess.getstatusoutput(cmd)
        if (status != 0):
            raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"]
                            % cmd + " Error: \n%s" % output)
        env_path = output.split("\n")[0]
        checkPathVaild(env_path)
        return env_path

    def __getStatusByOM(self, user):
        """
        function :Get the environment parameter.
        !!!!Do not call this function in preinstall.py script.
        because we determine if we are using env separate version by the
        value of MPPDB_ENV_SEPARATE_PATH
        input : String,String
        output : String
        """
        # get mpprc file
        mpprcFile = EnvUtil.getEnvironmentParameterValue('MPPDB_ENV_SEPARATE_PATH', user)
        if mpprcFile is not None and mpprcFile != "":
            mpprcFile = mpprcFile.replace("\\", "\\\\").replace('"', '\\"\\"')
            checkPathVaild(mpprcFile)
            userProfile = mpprcFile
        else:
            userProfile = ClusterConstants.BASHRC
        # build shell command
        if os.getuid() == 0:
            cmd = "su - %s -c 'source %s;gr_om -t status --detail" % (
                user, userProfile)
        else:
            cmd = "source %s;gr_om -t status --detail" % (userProfile)
        (status, output) = subprocess.getstatusoutput(cmd)
        if status != 0:
            raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"]
                            % cmd + " Error: \n%s" % output)
        return [i for i in output.strip().split("\n") if i]

    def __readStaticConfigFile(self, staticConfigFile, user, isLCCluster=False,
                               ignoreLocalEnv=False):
        """
        function : read cluster information from static configuration file
        input : String,String
        output : NA
        """
        fp = None
        try:
            # get env parameter
            gauss_env = self.__getEnvironmentParameterValue("GAUSS_ENV", user)
            self.name = self.__getEnvironmentParameterValue("GS_CLUSTER_NAME",
                                                            user)
            self.appPath = self.__getEnvironmentParameterValue("GAUSSHOME",
                                                               user)
            logPathWithUser = self.__getEnvironmentParameterValue("GAUSSLOG",
                                                                  user)

            if not ignoreLocalEnv:
                if gauss_env == "2" and self.name == "":
                    raise Exception(ErrorCode.GAUSS_503["GAUSS_50300"]
                                    % ("cluster name of designated user"
                                       " [%s]" % user))
                if self.appPath == "":
                    raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                                    ("installation path of designated user "
                                     "[%s]" % user))
                if logPathWithUser == "":
                    raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                                    ("log path of designated user [%s]" %
                                     user))

            splitMark = "/%s" % user
            # set log path without user
            # find the path from right to left
            self.logPath = logPathWithUser[
                           0:(logPathWithUser.rfind(splitMark))]
            staticConfigFilePath = os.path.split(staticConfigFile)[0]
            versionFile = os.path.join(
                staticConfigFilePath, "upgrade_version")
            version, number, commitid = VersionInfo.get_version_info(
                versionFile)
            try:
                # read static_config_file
                fp = open(staticConfigFile, "rb")
                if float(number) <= 92.200:
                    info = fp.read(32)
                    (crc, lenth, version, currenttime, nodeNum,
                     localNodeId) = struct.unpack("=qIIqiI", info)
                else:
                    info = fp.read(28)
                    (crc, lenth, version, currenttime, nodeNum,
                     localNodeId) = struct.unpack("=IIIqiI", info)
                self.version = version
                self.installTime = currenttime
                self.localNodeId = localNodeId
                self.nodeCount = nodeNum
            except Exception as e:
                if fp:
                    fp.close()
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                                % staticConfigFile + " Error:\n" + str(e))
            if version <= 100:
                raise Exception(ErrorCode.GAUSS_516["GAUSS_51637"] % \
                                ("cluster static config version[%s]" % version,
                                 "the new version[%s]" % BIN_CONFIG_VERSION))
            elif version >= 101 and version <= 200:
                self.clusterType = CLUSTER_TYPE_SINGLE
                if BIN_CONFIG_VERSION_SINGLE != version:
                    raise Exception(ErrorCode.GAUSS_516["GAUSS_51637"] % \
                                    ("cluster static config version[%s]"
                                     % version, "the new version[%s]"
                                     % BIN_CONFIG_VERSION_SINGLE))
            elif version >= 201 and version <= 300:
                # single primary multi standy
                self.clusterType = CLUSTER_TYPE_SINGLE_PRIMARY_MULTI_STANDBY
                if (BIN_CONFIG_VERSION_SINGLE_PRIMARY_MULTI_STANDBY
                        != version):
                    raise Exception(
                        ErrorCode.GAUSS_516["GAUSS_51637"]
                        % ("cluster static config version[%s]" % version,
                           "the new version[%s]"
                           % BIN_CONFIG_VERSION_SINGLE_PRIMARY_MULTI_STANDBY))
            elif version >= 301 and version <= 400:
                # single inst
                self.clusterType = CLUSTER_TYPE_SINGLE_INST
                if BIN_CONFIG_VERSION_SINGLE_INST != version:
                    raise Exception(ErrorCode.GAUSS_516["GAUSS_51637"] % \
                                    ("cluster static config version[%s]"
                                     % version, "the new version[%s]"
                                     % BIN_CONFIG_VERSION_SINGLE_INST))

            self.dbNodes = []
            try:
                for i in range(nodeNum):
                    offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
                    fp.seek(offset)
                    dbNode = self.__unPackNodeInfo(fp, number, isLCCluster)
                    self.dbNodes.append(dbNode)
                fp.close()
            except Exception as e:
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                                staticConfigFile + " Error:\nThe content is "
                                                   "not correct.")
        except Exception as e:
            if (fp):
                fp.close()
            raise Exception(str(e))

    def __unPackNodeInfo(self, fp, number, isLCCluster=False):
        """
        function : unpack a node config info
        input : file
        output : Object
        """
        if float(number) <= 92.200:
            info = fp.read(76)
            (crc, nodeId, nodeName) = struct.unpack("=qI64s", info)
        else:
            info = fp.read(72)
            (crc, nodeId, nodeName) = struct.unpack("=II64s", info)
        nodeName = nodeName.decode().strip('\x00')
        dbNode = dbNodeInfo(nodeId, nodeName)
        info = fp.read(68)
        (azName, azPriority) = struct.unpack("=64sI", info)
        dbNode.azName = azName.decode().strip('\x00')
        dbNode.azPriority = azPriority

        # get backIps
        self.__unPackIps(fp, dbNode.backIps)
        # get sshIps
        self.__unPackIps(fp, dbNode.sshIps)
        if (not isLCCluster):
            # get cm_server information
            self.__unPackCmsInfo(fp, dbNode)
            # get cm_agent information
            self.__unpackAgentInfo(fp, dbNode)
            # get gtm information
            self.__unpackGtmInfo(fp, dbNode)
            info = fp.read(404)
        # get DB information
        self.__unpackDataNode(fp, dbNode)
        if (not isLCCluster):
            # get etcd information
            self.__unpackEtcdInfo(fp, dbNode)
            info = fp.read(8)
        # set DB azName for OLAP
        for inst in dbNode.datanodes:
            inst.azName = dbNode.azName
            inst.azPriority = dbNode.azPriority

        return dbNode

    def __unpackEtcdInfo(self, fp, dbNode):
        """
        function : unpack the info of etcd
        input : file,Object
        output : NA
        """
        etcdInst = instanceInfo()
        etcdInst.instanceRole = INSTANCE_ROLE_ETCD
        etcdInst.hostname = dbNode.name
        etcdInst.instanceType = INSTANCE_TYPE_UNDEFINED
        info = fp.read(1100)
        (etcdNum, etcdInst.instanceId, etcdInst.mirrorId, etcdhostname,
         etcdInst.datadir) = struct.unpack("=IIi64s1024s", info)
        etcdInst.datadir = etcdInst.datadir.decode().strip('\x00')
        self.__unPackIps(fp, etcdInst.listenIps)
        info = fp.read(4)
        (etcdInst.port,) = struct.unpack("=I", info)
        self.__unPackIps(fp, etcdInst.haIps)
        info = fp.read(4)
        (etcdInst.haPort,) = struct.unpack("=I", info)
        if (etcdNum == 1):
            dbNode.etcdNum = 1
            dbNode.etcds.append(etcdInst)
            self.etcdcount += 1
        else:
            dbNode.etcdNum = 0
            dbNode.etcds = []

    def __unPackIps(self, fp, ips):
        """
        function : Unpack the info of ips
        input : file,[]
        output : NA
        """
        info = fp.read(4)
        (n,) = struct.unpack("=i", info)
        for i in range(int(n)):
            info = fp.read(128)
            (currentIp,) = struct.unpack("=128s", info)
            currentIp = currentIp.decode().strip('\x00')
            ips.append(str(currentIp.strip()))
        info = fp.read(128 * (MAX_IP_NUM - n))

    def __unPackCmsInfo(self, fp, dbNode):
        """
        function : Unpack the info of CMserver
        input : file Object
        output : NA
        """
        cmsInst = instanceInfo()
        cmsInst.instanceRole = INSTANCE_ROLE_CMSERVER
        cmsInst.hostname = dbNode.name
        info = fp.read(1164)
        (cmsInst.instanceId, cmsInst.mirrorId, dbNode.cmDataDir, cmsInst.level,
         self.cmsFloatIp) = struct.unpack("=II1024sI128s", info)
        dbNode.cmDataDir = dbNode.cmDataDir.decode().strip('\x00')
        self.cmsFloatIp = self.cmsFloatIp.decode().strip('\x00')
        cmsInst.datadir = "%s/cm_server" % dbNode.cmDataDir
        self.__unPackIps(fp, cmsInst.listenIps)
        info = fp.read(4)
        (cmsInst.port,) = struct.unpack("=I", info)
        self.__unPackIps(fp, cmsInst.haIps)
        info = fp.read(8)
        (cmsInst.haPort, cmsInst.instanceType) = struct.unpack("=II", info)
        if (cmsInst.instanceType == MASTER_INSTANCE):
            dbNode.cmsNum = 1
        elif (cmsInst.instanceType == STANDBY_INSTANCE):
            dbNode.cmsNum = 0
        else:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51204"]
                            % ("CMServer", cmsInst.instanceType))
        info = fp.read(4 + 128 * MAX_IP_NUM + 4)

        if (cmsInst.instanceId):
            dbNode.cmservers.append(cmsInst)
            self.cmscount += 1
        else:
            dbNode.cmservers = []

    def __unpackAgentInfo(self, fp, dbNode):
        """
        function : Unpack the info of agent. It should be called after
        __unPackCmsInfo, because dbNode.cmDataDir
                   get value in __unPackCmsInfo
        input : file Object
        output : NA
        """
        cmaInst = instanceInfo()
        cmaInst.instanceRole = INSTANCE_ROLE_CMAGENT
        cmaInst.hostname = dbNode.name
        cmaInst.instanceType = INSTANCE_TYPE_UNDEFINED
        info = fp.read(8)
        (cmaInst.instanceId, cmaInst.mirrorId) = struct.unpack("=Ii", info)
        self.__unPackIps(fp, cmaInst.listenIps)
        cmaInst.datadir = "%s/cm_agent" % dbNode.cmDataDir
        dbNode.cmagents.append(cmaInst)

    def __unpackGtmInfo(self, fp, dbNode):
        """      
        function : Unpack the info of gtm
        input : file Object
        output : NA
        """
        gtmInst = instanceInfo()
        gtmInst.instanceRole = INSTANCE_ROLE_GTM
        gtmInst.hostname = dbNode.name
        info = fp.read(1036)
        (gtmInst.instanceId, gtmInst.mirrorId, gtmNum,
         gtmInst.datadir) = struct.unpack("=III1024s", info)
        gtmInst.datadir = gtmInst.datadir.decode().strip('\x00')
        self.__unPackIps(fp, gtmInst.listenIps)
        info = fp.read(8)
        (gtmInst.port, gtmInst.instanceType) = struct.unpack("=II", info)
        if (gtmInst.instanceType == MASTER_INSTANCE):
            dbNode.gtmNum = 1
        elif (gtmInst.instanceType == STANDBY_INSTANCE):
            dbNode.gtmNum = 0
        else:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51204"] % (
                "GTM", gtmInst.instanceType))
        self.__unPackIps(fp, gtmInst.haIps)
        info = fp.read(4)
        (gtmInst.haPort,) = struct.unpack("=I", info)
        info = fp.read(1024 + 4 + 128 * MAX_IP_NUM + 4)

        if (gtmNum == 1):
            dbNode.gtms.append(gtmInst)
            self.gtmcount += 1
        else:
            dbNode.gtms = []

    def __unpackDataNode(self, fp, dbNode):
        """  
        function : Unpack the info of datanode
        input : file Object
        output : NA
        """
        info = fp.read(4)
        (dataNodeNums,) = struct.unpack("=I", info)
        dbNode.dataNum = 0

        dbNode.datanodes = []
        for i in range(dataNodeNums):
            dnInst = instanceInfo()
            dnInst.instanceRole = INSTANCE_ROLE_DATANODE
            dnInst.hostname = dbNode.name
            # In the upgrade scenario, there are two different read methods
            # for static config file.
            # First, use the new read mode, and judge that if the new read
            # mode is not correct,
            # then rollback by fp.seek(), and exchange its(xlogdir) value
            # with ssddir.
            info = fp.read(2056)
            (dnInst.instanceId, dnInst.mirrorId, dnInst.datadir,
             dnInst.xlogdir) = struct.unpack("=II1024s1024s", info)
            dnInst.datadir = dnInst.datadir.decode().strip('\x00')
            dnInst.xlogdir = dnInst.xlogdir.decode().strip('\x00')

            info = fp.read(1024)
            (dnInst.ssdDir) = struct.unpack("=1024s", info)
            dnInst.ssdDir = dnInst.ssdDir[0].decode().strip('\x00')
            # if notsetXlog,ssdDir should not be null.use by upgrade.
            if dnInst.ssdDir != "" and dnInst.ssdDir[0] != '/':
                fp.seek(fp.tell() - 1024)
                dnInst.ssdDir = dnInst.xlogdir
                dnInst.xlogdir = ""

            self.__unPackIps(fp, dnInst.listenIps)
            info = fp.read(8)
            (dnInst.port, dnInst.instanceType) = struct.unpack("=II", info)
            if (dnInst.instanceType == MASTER_INSTANCE):
                dbNode.dataNum += 1
            elif (dnInst.instanceType in [STANDBY_INSTANCE,
                                          DUMMY_STANDBY_INSTANCE, CASCADE_STANDBY]):
                pass
            else:
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51204"]
                                % ("DN", dnInst.instanceType))
            self.__unPackIps(fp, dnInst.haIps)
            info = fp.read(4)
            (dnInst.haPort,) = struct.unpack("=I", info)
            if (
                    self.clusterType ==
                    CLUSTER_TYPE_SINGLE_PRIMARY_MULTI_STANDBY or
                    self.clusterType == CLUSTER_TYPE_SINGLE_INST):
                maxStandbyCount = MIRROR_COUNT_REPLICATION_MAX - 1
                for j in range(maxStandbyCount):
                    peerDbInst = peerInstanceInfo()
                    info = fp.read(1024)
                    (peerDbInst.peerDataPath,) = struct.unpack("=1024s", info)
                    peerDbInst.peerDataPath = \
                        peerDbInst.peerDataPath.decode().strip('\x00')
                    self.__unPackIps(fp, peerDbInst.peerHAIPs)
                    info = fp.read(8)
                    (peerDbInst.peerHAPort,
                     peerDbInst.peerRole) = struct.unpack("=II", info)
                    dnInst.peerInstanceInfos.append(peerDbInst)
            else:
                peerDbInst = peerInstanceInfo()
                info = fp.read(1024)
                (peerDbInst.peerDataPath,) = struct.unpack("=1024s", info)
                peerDbInst.peerDataPath = \
                    peerDbInst.peerDataPath.decode().strip('\x00')
                self.__unPackIps(fp, peerDbInst.peerHAIPs)
                info = fp.read(8)
                (peerDbInst.peerHAPort, peerDbInst.peerRole) = \
                    struct.unpack("=II", info)
                info = fp.read(1024)
                (peerDbInst.peerData2Path,) = struct.unpack("=1024s", info)
                peerDbInst.peerData2Path = \
                    peerDbInst.peerDataPath.decode().strip('\x00')
                self.__unPackIps(fp, peerDbInst.peer2HAIPs)
                info = fp.read(8)
                (peerDbInst.peer2HAPort, peerDbInst.peer2Role) = \
                    struct.unpack("=II", info)
                dnInst.peerInstanceInfos.append(peerDbInst)
            dbNode.datanodes.append(dnInst)

    def setInstId(self, instList, nodeIdInstIdDict, newNodeId, newInstId):
        """
        instList                  instance list
        nodeIdInstIdDict          node id and instance id dict
        newNodeId                 new node id
        newInstId                 new instance id
        
        """
        for inst in instList:
            if (newNodeId in list(nodeIdInstIdDict.keys())):
                inst.instanceId = nodeIdInstIdDict[newNodeId]
            # the New agent instance
            else:
                inst.instanceId = newInstId
                newInstId += 1
        return newInstId

    def refreshInstIdByInstType(self, oldNodesList, newNodesList,
                                instType="cmagent"):
        """
        """
        nodeIdInstanceIdDict = {}
        # get the node id and cmagent/cmserver/gtm/etcd/cn instance id dict
        for oldNode in oldNodesList:
            if (instType == "cmagent"):
                for cmaInst in oldNode.cmagents:
                    nodeIdInstanceIdDict[oldNode.id] = cmaInst.instanceId
            elif (instType == "cmserver"):
                for cmsInst in oldNode.cmservers:
                    nodeIdInstanceIdDict[oldNode.id] = cmsInst.instanceId
            elif (instType == "gtm"):
                for gtmInst in oldNode.gtms:
                    nodeIdInstanceIdDict[oldNode.id] = gtmInst.instanceId
            elif (instType == "etcd"):
                for etcdInst in oldNode.etcds:
                    nodeIdInstanceIdDict[oldNode.id] = etcdInst.instanceId
            elif (instType == "cn"):
                for cnInst in oldNode.coordinators:
                    # warm-standby: the number of nodes is same,so refrush
                    # by id
                    # addcn out cluster:refrush by id or nodename
                    # addcn in cluster:refrush by id or nodename
                    # deletecn out cluster:refrush by nodename
                    # deletecn in cluster:refrush by id or nodename
                    # expand:refrush by id or nodename
                    # shink in tail:refrush by id or nodename
                    # shink in mid:refrush by nodename
                    if (len(oldNodesList) == len(newNodesList)):
                        nodeIdInstanceIdDict[oldNode.id] = cnInst.instanceId
                    else:
                        nodeIdInstanceIdDict[oldNode.name] = cnInst.instanceId

        # sort instance id lists and set newInstId = the max ID num + 1
        instIDList = list(nodeIdInstanceIdDict.values())
        instIDList.sort()
        if (len(instIDList) > 0):
            newInstId = instIDList[-1] + 1
        else:
            newInstId = 1

        # refresh instance id by oldClusterInfo
        for newNode in newNodesList:
            if (instType == "cmagent"):
                newInstId = self.setInstId(newNode.cmagents,
                                           nodeIdInstanceIdDict, newNode.id,
                                           newInstId)
            elif (instType == "cmserver"):
                newInstId = self.setInstId(newNode.cmservers,
                                           nodeIdInstanceIdDict, newNode.id,
                                           newInstId)
            elif (instType == "gtm"):
                newInstId = self.setInstId(newNode.gtms, nodeIdInstanceIdDict,
                                           newNode.id, newInstId)
            elif (instType == "etcd"):
                newInstId = self.setInstId(newNode.etcds, nodeIdInstanceIdDict,
                                           newNode.id, newInstId)
            elif (instType == "cn"):
                if (len(oldNodesList) == len(newNodesList)):
                    newInstId = self.setInstId(newNode.coordinators,
                                               nodeIdInstanceIdDict,
                                               newNode.id, newInstId)
                else:
                    newInstId = self.setInstId(newNode.coordinators,
                                               nodeIdInstanceIdDict,
                                               newNode.name, newInstId)

    def __check_cms_config(self):
        """
        Check cm_server config
        """

        if self.cmscount > 0 and len(self.dbNodes) < 2:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] +
                                "The cm_server instance can be "
                                "configured only on three or more nodes.")
        if 0 < self.cmscount < 2:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] +
                            "At least three cm_server instances are required.")


    def checkXMLFile(self, xml_file):
        """
        function : check XML contain DTDs
        input : String
        output : NA
        """
        # Check xml for security requirements
        # if it have "<!DOCTYPE" or it have "<!ENTITY",
        # exit and print "File have security risks."
        try:
            with open(xml_file, "r", encoding='utf-8') as fb:
                lines = fb.readlines()
            for line in lines:
                if re.findall("<!DOCTYPE", line) or re.findall("<!ENTITY", line):
                    raise Exception("File have security risks.")
        except Exception as e:
            raise Exception(str(e))

    def initParserXMLFile(self, xml_file_path):
        """
        function : Init parser xml file
        input : String
        output : Object
        """
        try:
            # check xml for security requirements
            self.checkXMLFile(xml_file_path)
            dom_tree = ETree.parse(xml_file_path)
            root_node = dom_tree.getroot()
        except Exception as e:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] + " Error: \n%s." % str(e))

        return root_node
    @staticmethod
    def readOneClusterConfigItem(root_node, para_name, input_element_name,
                                 nodeName=""):
        """
        function : Read one cluster configuration item
        input : Object,String,String
        output : String,String
        """
        # if read node level config item, should input node name
        if input_element_name.upper() == 'node'.upper() and nodeName == "":
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51201"] + \
                            " Need node name for node configuration level.")

        element_name = input_element_name.upper()
        return_value = ""
        return_status = 2

        if element_name == 'cluster'.upper():
            if not root_node.findall('CLUSTER'):
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"] % element_name)
            element = root_node.findall('CLUSTER')[0]
            nodeArray = element.findall('PARAM')
            #ClusterConfigFile.validate_param_names_in_cluster(nodeArray)
            (return_status, return_value) = dbClusterInfo.findParamInCluster(para_name, nodeArray)
        elif element_name == 'node'.upper():
            element_name = 'DEVICELIST'
            if not root_node.findall('DEVICELIST'):
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"] % element_name)
            device_array = root_node.findall('DEVICELIST')[0]
            device_node = device_array.findall('DEVICE')
            #ClusterConfigFile.validate_param_names_in_devicelist(device_node)
            (return_status, return_value) = dbClusterInfo.findParamByName(nodeName, para_name, device_node)
        else:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"] % element_name)
        return (return_status, return_value)

    def findParamInCluster(para_name, node_array):
        """
        function : Find parameter in cluster
        input : String,[]
        output : String,String
        """
        return_value = ""
        return_status = 2
        for node in node_array:
            name = node.attrib['name']
            if name == para_name:
                return_status = 0
                return_value = str(node.attrib['value'])
                break
        return return_status, return_value

    def findParamByName(node_name, para_name, device_node):
        """
        function : Find parameter by name
        input : String,String,Object
        output : String,String
        """
        return_value = ""
        return_status = 2
        for dev in device_node:
            param_list = dev.findall('PARAM')
            for param in param_list:
                thisname = param.attrib['name']
                if thisname == 'name':
                    value = param.attrib['value']
                    if node_name == value:
                        for param in param_list:
                            name = param.attrib['name']
                            if name == para_name:
                                return_status = 0
                                return_value = str(param.attrib['value'].strip())
                                if ((name.find("Dir") > 0 or name.find(
                                        "dataNode") == 0) and return_value != ""):
                                    return_value = os.path.normpath(return_value)
        return return_status, return_value

    def initFromXml(self, xmlFile):
        """
        function : Init cluster from xml config file
        input : file Object for OLAP
                dbClusterInfo instance
                inherit: instance id inherit from the old cluster.
                append: instance id append to the old cluster.
        output : NA
        """
        if (not os.path.exists(xmlFile)):
            raise Exception("XML configuration file not exist")

        self.xmlFile = xmlFile

        # Set the environment variable, then the readcluster command can
        # read from it.
        os.putenv(ENV_CLUSTERCONFIG, xmlFile)
        # parse xml file
        global xmlRootNode
        try:
            xmlRootNode = self.initParserXMLFile(xmlFile)
        except Exception as e:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51234"]
                            % xmlFile + " Error:\n%s" % str(e))

        self.__readClusterGlobalInfo()
        self.get_cluster_back_ip1s()
        if self.__read_and_check_config_item(xmlRootNode, "clusterType", "cluster", True) == \
                "single-inst-one-node":
            self.__read_cluster_node_info_for_one()
        else:
            self.__readClusterNodeInfo()
        self.__checkAZForSingleInst()
        IpPort = self.__checkInstancePortandIP()
        self.__check_cms_config()
        return IpPort

    def __read_cluster_node_info_for_one(self):
        """
        function : Read cluster node info.
        input : NA
        output : NA
        """
        # read cluster node info.
        (_, node_name) = self.readOneClusterConfigItem(xmlRootNode,
                                                                    "nodeNames",
                                                                    "cluster")
        if [node_name] != self.__getAllHostnamesFromDEVICELIST():
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] +
                            " The number of nodeNames and DEVICE are not same.")
        if (not self.__isIpValid(self.cluster_back_ip1s[0])):
            raise Exception(ErrorCode.GAUSS_506["GAUSS_50603"] + \
                            "The IP address is: %s." % self.cluster_back_ip1s[0] + " Please "
                                                                                 "check it.")
        # Get basic info of node: name, ip and master instance number etc.
        self.dbNodes = []
        db_node = dbNodeInfo(1, node_name)
        self.__readNodeBasicInfo(db_node, [node_name])
        self.dbNodes.append(db_node)
        # Get datanode info
        for i in range(db_node.dataNum):
            db_inst = instanceInfo(BASE_ID_DATANODE + i, 1)
            db_inst.hostname = node_name
            db_inst.datadir = self.__readNodeStrValue(node_name, "dataNode%s" % (i+1))
            db_inst.instanceType = MASTER_INSTANCE if i == 0 else STANDBY_INSTANCE
            db_inst.instanceRole = INSTANCE_ROLE_DATANODE
            db_inst.listenIps = db_node.backIps[:]
            db_inst.haIps = db_node.backIps[:]
            db_inst.port = self.__readNodeIntValue(node_name, "dataPortBase%s" % (i+1))
            db_inst.haPort = db_inst.port + 1
            db_inst.ssdDir = ""
            db_inst.syncNum = -1
            db_inst.syncNumFirst = ""
            db_inst.azName = db_node.azName
            db_inst.azPriority = db_node.azPriority
            db_inst.ssh_port = db_node.ssh_port
            self.dbNodes[0].datanodes.append(db_inst)
        self.dbNodes[0].appendInstance(1, MIRROR_ID_AGENT, INSTANCE_ROLE_CMAGENT,
                                       INSTANCE_TYPE_UNDEFINED, [], None, "")

    def getClusterNodeNames(self):
        """
        function : Get the cluster's node names.
        input : NA
        output : NA
        """
        return [dbNode.name for dbNode in self.dbNodes]

    def getClusterNodeIds(self):
        """
        function : Get the cluster's node names.
        input : NA
        output : NA
        """
        return [dbNode.id for dbNode in self.dbNodes]

    def get_cluster_node_ssh_port_by_ip(self, ip):
        """
        function : Get the cluster's node ssh port by ip.
        input : NA
        output : NA
        """
        for node in self.dbNodes:
            if ip == node.sshIps[0]:
                return node.ssh_port
        return 22
            
    def get_cluster_nodes_ssh_port_by_ips(self, ips):
        """
        function : Get the cluster's node ssh port by ips.
        input : NA
        output : NA
        """
        ssh_ports_map = {}
        for ip in ips:
            ssh_port = self.get_cluster_node_ssh_port_by_ip(ip)
            ssh_ports_map[ip] = ssh_port
        return ssh_ports_map

    def getdataNodeInstanceType(self, nodeId=-1):
        """
        function: get the dataNode's instanceType
        input:  NA
        output: NA
        """
        for dbNode in self.dbNodes:
            if nodeId == dbNode.id:
                for dataNode in dbNode.datanodes:
                    return dataNode.instanceType

    def getHostNameByNodeId(self, nodeId=-1):
        """
        function: get the dataNode's name by nodeId
        input:  NA
        output: NA
        """
        for dbNode in self.dbNodes:
            if nodeId == dbNode.id:
                return dbNode.name

    def get_cluster_directory_dict(self):
        """
        function : Get cluster all directorys
        input : NA
        output : List
        """
        cluster_dirs = dict()
        cluster_dirs["appPath"] = [self.appPath]
        cluster_dirs["logPath"] = [self.logPath]
        # get cluster all directorys
        for db_node in self.dbNodes:
            # including cm_server, cm_agent, cn, dn, gtm, etcd, ssd
            cn_dict =  dict(data_dir="", ssd="")
            dn_dict = dict(data_dir=list(), ssd=list(), xlog_dir=list())
            node_dict = dict(cm_server="", cm_agent="", cn=cn_dict,
                             dn=dn_dict, gtm="", etcd="", ssd="")
            if db_node.cmservers:
                node_dict["cm_server"] = db_node.cmservers[0].datadir
            if db_node.cmagents:
                node_dict["cm_agent"] = db_node.cmagents[0].datadir
            if db_node.gtms:
                node_dict["gtm"] = db_node.gtms[0].datadir
            if db_node.coordinators:
                node_dict["cn"]["data_dir"] = db_node.coordinators[0].datadir
                if db_node.coordinators[0].ssdDir:
                    node_dict["cn"]["ssd"] = db_node.coordinators[0].ssdDir
            for dbInst in db_node.datanodes:
                node_dict["dn"]["data_dir"].append(dbInst.datadir)
                node_dict["dn"]["xlog_dir"].append(dbInst.xlogdir)
                if dbInst.ssdDir:
                    node_dict["dn"]["ssd"].append(dbInst.ssdDir)
            if db_node.etcds:
                node_dict["etcd"] = db_node.etcds[0].datadir
            cluster_dirs[db_node.name] = node_dict
        return cluster_dirs


    def getClusterDirectorys(self, hostName="", ignore=True):
        """
        function : Get cluster all directorys
        input : NA
        output : List
        """
        clusterDirs = {}
        clusterDirs["appPath"] = [self.appPath]
        if (ignore):
            clusterDirs["logPath"] = [self.logPath]
        # get cluster all directorys
        for dbNode in self.dbNodes:
            nodeName = dbNode.name
            if (hostName != ""):
                if (hostName != nodeName):
                    continue
            nodeDirs = []
            # including cm_server, cm_agent, cn, dn, gtm, etcd, ssd
            nodeDirs.append(dbNode.cmDataDir)
            for dbInst in dbNode.cmservers:
                nodeDirs.append(dbInst.datadir)
            for dbInst in dbNode.cmagents:
                nodeDirs.append(dbInst.datadir)
            for dbInst in dbNode.gtms:
                nodeDirs.append(dbInst.datadir)
            for dbInst in dbNode.coordinators:
                nodeDirs.append(dbInst.datadir)
                if (len(dbInst.ssdDir) != 0):
                    nodeDirs.append(dbInst.ssdDir)
            for dbInst in dbNode.datanodes:
                nodeDirs.append(dbInst.datadir)
                nodeDirs.append(dbInst.xlogdir)
                if (len(dbInst.ssdDir) != 0):
                    nodeDirs.append(dbInst.ssdDir)
            for dbInst in dbNode.etcds:
                nodeDirs.append(dbInst.datadir)
            clusterDirs[nodeName] = nodeDirs
        return clusterDirs

    def getDbNodeByName(self, name):
        """
        function : Get node by name.
        input : nodename
        output : []
        """
        for dbNode in self.dbNodes:
            if (dbNode.name == name):
                return dbNode

        return None
    
    def setDbNodeInstancdIdByName(self, name, instanceId):
        for dbNode in self.dbNodes:
            if dbNode.name == name and len(dbNode.datanodes) > 0:
                dbNode.datanodes[0].instanceId = instanceId
                return

    def getPeerInstance(self, dbInst):
        """  
        function : Get peer instance of specified instance.
        input : []
        output : []
        """
        instances = []
        for dbNode in self.dbNodes:
            for inst in dbNode.datanodes:
                if (inst.mirrorId == dbInst.mirrorId and
                        inst.instanceId != dbInst.instanceId):
                    instances.append(inst)
        if instances:
            instances.sort(key=lambda inst: inst.instanceId)
        return instances

    def getClusterBackIps(self):
        """
        function : Get cluster back IP.
        input : NA
        output : []
        """
        backIps = []
        backIpNum = []
        # get backIp number
        for dbNode in self.dbNodes:
            backIpNum.append(len(dbNode.backIps))
        if max(backIpNum) != min(backIpNum):
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51227"] % "backIps")
        for num in range(backIpNum[0]):
            ips = []
            for dbNode in self.dbNodes:
                ips.append(dbNode.backIps[num])
            backIps.extend(ips)
        return self.compress_ips(backIps)

    def getClusterSshIps(self):
        """
        function : Get cluster ssh IP.
        input : NA
        output : []
        """
        sshIps = []
        sshIpNum = []
        # get sshIp number
        for dbNode in self.dbNodes:
            sshIpNum.append(len(dbNode.sshIps))
        if max(sshIpNum) != min(sshIpNum):
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51227"] % "sshIps")
        for num in range(sshIpNum[0]):
            ips = []
            for dbNode in self.dbNodes:
                ips.append(dbNode.sshIps[num])
            sshIps.append(self.compress_ips(ips))
        return sshIps

    def getazNames(self):
        """
        """
        azMap = {}
        azNames = []
        for dbNode in self.dbNodes:
            azMap[dbNode.azName] = []
            if (dbNode.azName not in azNames):
                azNames.append(dbNode.azName)
        for dbNode in self.dbNodes:
            azMap[dbNode.azName].append(dbNode.azPriority)
        for azName in azNames:
            azMap[azName] = max(azMap[azName])
        azNames = sorted(azMap, key=lambda x: azMap[x])
        return azNames

    def getNodeNameByBackIp(self, backIp):
        """
        function : Get Nodename by backip.
        input : String
        output : String
        """
        nodeName = ""
        for dbNode in self.dbNodes:
            if (backIp in dbNode.backIps):
                nodeName = dbNode.name
                break
        return nodeName

    def __checkInstancePortandIP(self):
        """
        function : Check instance Port and IP.
        input : NA
        output : NA
        """
        nodeipport = {}
        for dbNode in self.dbNodes:
            nodeips = []
            nodeports = []
            cmsListenIPs = []
            ipCheckMap = {}
            backIP1 = dbNode.backIps[0]
            nodeips.extend(dbNode.backIps)
            nodeips.extend(dbNode.sshIps)
            # Check whether the ip addresses of the cluster block and device are consistent
            if backIP1 != self.cluster_back_ip1s[self.node_num]:
                raise Exception(ErrorCode.GAUSS_506["GAUSS_50625"] +
                                "These ip addresses are %s and %s" % (self.cluster_back_ip1s[self.node_num], backIP1)
                                + ". Please check it.")
            self.node_num += 1
            # get node ip and node port from cmserver
            for cmsInst in dbNode.cmservers:
                nodeips.extend(cmsInst.listenIps)
                nodeips.extend(cmsInst.haIps)
                cmsListenIPs = cmsInst.listenIps
                ipCheckMap["cmServerListenIp1"] = cmsInst.listenIps[0]
                ipCheckMap["cmServerHaIp1"] = cmsInst.haIps[0]
                nodeports.append(cmsInst.port)
                nodeports.append(cmsInst.haPort)
            # get node ip and node port from gtm
            for gtmInst in dbNode.gtms:
                nodeips.extend(gtmInst.listenIps)
                nodeips.extend(gtmInst.haIps)
                nodeports.append(gtmInst.port)
                nodeports.append(gtmInst.haPort)
            # get node ip and node port from cn
            for cooInst in dbNode.coordinators:
                nodeips.extend(cooInst.listenIps)
                nodeips.extend(cooInst.haIps)
                nodeports.append(cooInst.port)
                nodeports.append(cooInst.haPort)
            # get node ip and node port from dn
            for dnInst in dbNode.datanodes:
                nodeips.extend(dnInst.listenIps)
                nodeips.extend(dnInst.haIps)
                nodeports.append(dnInst.port)
                nodeports.append(dnInst.haPort)
                if (self.checkSctpPort):
                    nodeports.append(dnInst.port +
                                     dbNode.getDnNum(dnInst.instanceType) * 2)
            # get node ip and node port from etcd
            for etcdInst in dbNode.etcds:
                nodeips.extend(etcdInst.listenIps)
                nodeips.extend(etcdInst.haIps)
                nodeports.append(etcdInst.port)
                nodeports.append(etcdInst.haPort)
                ipCheckMap["etcdListenIp1"] = etcdInst.listenIps[0]
                ipCheckMap["etcdHaIp1"] = etcdInst.haIps[0]
                if (len(etcdInst.listenIps) > 1):
                    etcdListenIp2 = etcdInst.listenIps[1]
                    if (etcdListenIp2 != backIP1):
                        raise Exception(ErrorCode.GAUSS_512["GAUSS_51220"] % (
                                "%s with etcdListenIp2" % etcdListenIp2) +
                                        " Error: \nThe IP address must be "
                                        "the same as the backIP1 %s." %
                                        backIP1)

            # CMS IP must be consistent with CMA IP
            cmaListenIPs = dbNode.cmagents[0].listenIps
            if (cmsListenIPs and cmsListenIPs != cmaListenIPs):
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51220"] % (
                        "%s with cm_server" % cmsListenIPs) +
                                " Error: \nThe IP address must be the same "
                                "as the cm_agent %s." % cmaListenIPs)
            if (g_networkType == 1):
                # Check
                ipCheckMap["cmAgentConnectIp1"] = cmaListenIPs[0]
                if (len(set(ipCheckMap.values())) != 1):
                    errMsg = " Error: \nThe following IPs must be consistent:"
                    for ipConfigItem in list(ipCheckMap.keys()):
                        errMsg += "\n%s: %s" % (
                            ipConfigItem, ipCheckMap[ipConfigItem])
                    raise Exception(ErrorCode.GAUSS_512["GAUSS_51220"] % (
                        "with cm and etcd") + errMsg)
            # create a dictionary
            nodeipport[dbNode.name] = [nodeips, nodeports]
            # check port and ip
            self.__checkPortandIP(nodeips, nodeports, dbNode.name)
        return nodeipport

    def __checkPortandIP(self, ips, ports, name):
        """  
        function : Check  port and IP.
        input : String,int,string
        output : NA
        """
        ipsCopy = list(set(ips))
        portsCopy = list(set(ports))
        for port in portsCopy:
            if (not self.__isPortValid(port)):
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51233"]
                                % (port, name) + " Please check it.")

        for ip in ipsCopy:
            if (not self.__isIpValid(ip)):
                raise Exception(ErrorCode.GAUSS_506["GAUSS_50603"] + \
                                "The IP address is: %s." % ip + " Please "
                                                                "check it.")
            self.ips_type.append(get_ip_version(ip))
            if len(set(self.ips_type)) > 1 or (len(set(self.ips_type)) == 1 and ("" in set(self.ips_type))):
                raise Exception(ErrorCode.GAUSS_506["GAUSS_50624"] +
                                "The types of these ip addresses are %s" % self.ips_type + ". Please "
                                                                                           "check it.")

    @staticmethod
    def __read_and_check_config_item(root_node, para, root_type, error_ignore=False):
        """
        function : Read one cluster configuration item and check path valid
        input : root_node: RootNode
                para: param_name
                root_type: clusterType or node
                error_ignore: boolean
        output : String
        """
        status, output = dbClusterInfo.readOneClusterConfigItem(
            root_node, para, root_type)
        if status != 0 and not error_ignore:
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % para + " Error: \n%s" % output)
        if output.strip() and para == "installPath":
            output = os.path.normpath(output.strip())
        else:
            output = output.strip()

        if output:
            checkPathVaild(output)
        return output

    def __readClusterGlobalInfo(self):
        """
        Read cluster info from xml config's <CLUSTER> tag except nodeNames,
        clusterRings and sqlExpandNames info
        :return: NA
        """
        global g_networkType
        self.clusterType = CLUSTER_TYPE_SINGLE_INST

        # Read cluster name
        self.name = self.__read_and_check_config_item(xmlRootNode, "clusterName", "cluster")

        self.installPath = self.__read_and_check_config_item(xmlRootNode, "installPath", "cluster")
        self.appPath = os.path.join(self.installPath, "gr")
        self.toolPath = os.path.join(self.installPath, "tool")
        self.tmpPath = os.path.join(self.installPath, "tmp")
        self.logPath = os.path.join(self.installPath, "log")
        self.grPath = os.path.join(self.installPath, "gr")
        self.wormPath = self.__read_and_check_config_item(xmlRootNode, "wormPath", "cluster", False)
        if not self.wormPath:
            self.wormPath = os.path.join(self.installPath, "data")

        if not self.logPath:
            self.logPath = "/var/log/gaussdb"
        if not os.path.isabs(self.logPath):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50213"] % \
                            ("%s log path(%s)" % (
                                VersionInfo.PRODUCT_NAME, self.logPath)))

        # Read enable_dcf
        ret_status, self.enable_dcf = self.readOneClusterConfigItem(xmlRootNode,
                                                               "enable_dcf",
                                                               "cluster")
        if self.enable_dcf not in ['', 'on', 'off']:
            raise Exception(ErrorCode.GAUSS_500["GAUSS_50011"] %
                                ('enable_dcf', self.enable_dcf))

        if self.enable_dcf == 'on':
            (ret_status, ret_value) = self.readOneClusterConfigItem(
                xmlRootNode, "dcf_config", "CLUSTER")
            if ret_status == 0:
                self.dcf_config = ret_value.strip()
                if self.dcf_config.count('role') - self.dcf_config.count('PASSIVE') < 3:
                    raise Exception(ErrorCode.GAUSS_500["GAUSS_50011"] %
                                    ('dcf_config', self.dcf_config))
            else:
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] %
                                'dcf_config' + " Error: \n%s" % ret_value)

        # Read network type
        (retStatus, retValue) = self.readOneClusterConfigItem(
            xmlRootNode, "networkType", "cluster")
        if retStatus == 0:
            if retValue.isdigit() and int(retValue) in [0, 1]:
                g_networkType = int(retValue)
            else:
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                  "cluster network type" + " Error: \nThe parameter value must be 0 or 1.")
        elif retStatus == 2:
            g_networkType = 0
        else:
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                            "cluster network type" + " Error: \n%s" % retValue)

        # Read gr info
        status, self.gr_nodes_list = self.readOneClusterConfigItem(
            xmlRootNode, "gr_nodes_list", "cluster")

    def get_cluster_back_ip1s(self):
        # Read cluster backIp1s
        status, output = self.readOneClusterConfigItem(
            xmlRootNode, "backIp1s", "cluster")
        if status != 0:
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % "backIp1s" + " Error: \n%s" % output)

        cluster_backip1s_str = output.strip()

        if output:
            self.cluster_back_ip1s = cluster_backip1s_str.split(",")
            self.cluster_back_ip1s = self.compress_ips(self.cluster_back_ip1s)

    def __getAllHostnamesFromDEVICELIST(self):
        """
        function : Read all host name from <DEVICELIST>
        input : Na
        output : str
        """
        if not xmlRootNode.findall('DEVICELIST'):
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"] % 'DEVICELIST')
        DeviceArray = xmlRootNode.findall('DEVICELIST')[0]
        DeviceNodeList = DeviceArray.findall('DEVICE')
        allNodeName = []
        for dev in DeviceNodeList:
            paramList = dev.findall('PARAM')
            for param in paramList:
                thisname = param.attrib['name']
                if (thisname == 'name'):
                    value = param.attrib['value']
                    allNodeName.append(value)
        return allNodeName

    def __readClusterNodeInfo(self):
        """
        function : Read cluster node info.
        input : NA
        output : NA
        """
        # read cluster node info.
        (retStatus, retValue) = self.readOneClusterConfigItem(xmlRootNode,
                                                                           "nodeNames",
                                                                           "cluster")
        if (retStatus != 0):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                            % "node names" + " Error: \n%s" % retValue)
        nodeNames = []
        backip_types = set()
        nodeNames_tmp = retValue.split(",")
        for back_ip in self.cluster_back_ip1s:
            if (not self.__isIpValid(back_ip)):
                raise Exception(ErrorCode.GAUSS_506["GAUSS_50603"] + \
                                "The IP address is: %s." % back_ip + " Please "
                                                                    "check it.")
            backip_types.add(get_ip_version(back_ip))
            if len(backip_types) > 1 or (len(backip_types) == 1 and ("" in backip_types)):
                raise Exception(ErrorCode.GAUSS_506["GAUSS_50624"] +
                                "The types of these ip addresses are %s" % backip_types + ". Please "
                                                                                          "check it.")
        for nodename in nodeNames_tmp:
            nodeNames.append(nodename.strip())
        if (len(nodeNames) == 0):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                            "cluster configuration" + " There is no node in "
                                                      "cluster configuration"
                                                      " file.")

        if (len(nodeNames) != len(list(set(nodeNames)))):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                            "cluster configuration" + " There contains "
                                                      "repeated node in "
                                                      "cluster configuration "
                                                      "file.")

        # Check node names
        nodeNameList = self.__getAllHostnamesFromDEVICELIST()
        if len(nodeNameList) != len(nodeNames):
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] + \
                            " The number of nodeNames and DEVICE are not "
                            "same.")
        for nodeName in nodeNames:
            if nodeName not in nodeNameList:
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] + \
                                " Can not found DEVICE for [%s]." % nodeName)
        # Get basic info of node: name, ip and master instance number etc.
        self.dbNodes = []
        i = 1
        for name in nodeNames:
            dbNode = dbNodeInfo(i, name)
            self.__readNodeBasicInfo(dbNode, nodeNames)
            self.dbNodes.append(dbNode)
            i += 1

        # Get cm server info
        for dbNode in self.dbNodes:
            self.__readCmsConfig(dbNode)

        # Get datanode info
        for dbNode in self.dbNodes:
            self.__readDataNodeConfig(dbNode)

        # Get cm agent info
        for dbNode in self.dbNodes:
            self.__readCmaConfig(dbNode)

        # Get gr info
        #for dbNode in self.dbNodes:
        #    self.__readGrConfig(dbNode)

        # set DB port for OLAP
        for node in self.dbNodes:
            for inst in node.datanodes:
                inst.azName = node.azName
                inst.azPriority = node.azPriority
        self.__setNodePortForSinglePrimaryMultiStandby()

    def compress_ips(self, ips):
        # New list to store the compressed IP addresses
        compressed_ips = []

        for ip in ips:
            ip = ip.strip()
            if (not self.__isIpValid(ip)):
                raise Exception(ErrorCode.GAUSS_506["GAUSS_50603"] + \
                    "The IP address is: %s." % ip + " Please check it.")
            # Convert to an IP address object
            ip_obj = ipaddress.ip_address(ip)
            # Obtain the compressed IP
            compressed_ip = ip_obj.compressed
            compressed_ips.append(compressed_ip)
        return compressed_ips

    def __getPeerInstance(self, dbInst):
        """
        function : Get peer instance of specified instance.
        input : []
        output : []
        """
        instances = []
        if (dbInst.instanceRole == INSTANCE_ROLE_CMSERVER):
            for dbNode in self.dbNodes:
                for inst in dbNode.cmservers:
                    if (inst.mirrorId == dbInst.mirrorId and
                            inst.instanceId != dbInst.instanceId):
                        instances.append(inst)
        elif (dbInst.instanceRole == INSTANCE_ROLE_GTM):
            for dbNode in self.dbNodes:
                for inst in dbNode.gtms:
                    if (inst.mirrorId == dbInst.mirrorId and
                            inst.instanceId != dbInst.instanceId):
                        instances.append(inst)
        elif (dbInst.instanceRole == INSTANCE_ROLE_COODINATOR):
            for dbNode in self.dbNodes:
                for inst in dbNode.coordinators:
                    if (inst.mirrorId == dbInst.mirrorId and
                            inst.instanceId != dbInst.instanceId):
                        instances.append(inst)
        elif (dbInst.instanceRole == INSTANCE_ROLE_DATANODE):
            for dbNode in self.dbNodes:
                for inst in dbNode.datanodes:
                    if (inst.mirrorId == dbInst.mirrorId and
                            inst.instanceId != dbInst.instanceId):
                        instances.append(inst)

        return instances

    def __setNodePortForSinglePrimaryMultiStandby(self):
        """
        function : set the standy DB port.
        input : []
        output : NA
        """
        for dbNode in self.dbNodes:
            i = 0
            for dbInst in dbNode.datanodes:
                if (dbInst.instanceType == MASTER_INSTANCE):
                    dbInst.port = dbNode.masterBasePorts[
                                      INSTANCE_ROLE_DATANODE] + i * \
                                  PORT_STEP_SIZE
                    dbInst.haPort = dbInst.port + 1
                    peerInsts = self.__getPeerInstance(dbInst)
                    for j in range(len(peerInsts)):
                        peerInsts[j].port = dbInst.port
                        peerInsts[j].haPort = peerInsts[j].port + 1
                    i += 1
            # flush CMSERVER instance port
            i = 0
            cmsbaseport = 0
            for dbInst in dbNode.cmservers:
                if (dbInst.instanceType == MASTER_INSTANCE):
                    cmsbaseport = dbNode.masterBasePorts[
                        INSTANCE_ROLE_CMSERVER]
                    dbInst.port = cmsbaseport + i * PORT_STEP_SIZE
                    dbInst.haPort = dbInst.port + 1
                    peerInsts = self.__getPeerInstance(dbInst)
                    for j in range(len(peerInsts)):
                        peerInsts[j].port = cmsbaseport
                        peerInsts[j].haPort = peerInsts[j].port + 1
                    i += 1
            # flush GTM instance port
            i = 0
            gtmbaseport = 0
            for dbInst in dbNode.gtms:
                if (dbInst.instanceType == MASTER_INSTANCE):
                    gtmbaseport = dbNode.masterBasePorts[INSTANCE_ROLE_GTM]
                    dbInst.port = gtmbaseport + i * PORT_STEP_SIZE
                    dbInst.haPort = dbInst.port + 1
                    peerInsts = self.__getPeerInstance(dbInst)
                    for j in range(len(peerInsts)):
                        peerInsts[j].port = gtmbaseport
                        peerInsts[j].haPort = peerInsts[j].port + 1
                    i += 1

    def set_cm_info_for_node(self, node, node_names):
        """
        Set CM information for node
        """
        for node_name in node_names:
            if node.cmDataDir.replace(" ", "").find("," + node_name.replace(" ", "") + ",") >= 0:
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51235"] %
                                node.cmDataDir +
                                " The cmDir only need one path while you configure "
                                "it with primary and standby cmDir, "
                                "please modify it and try again. "
                                "You can examine the install guide "
                                "for more information to configure xml file.")

        # Get base port
        if node.cmsNum > 0:
            node.masterBasePorts[INSTANCE_ROLE_CMSERVER] = \
                self.__readNodeIntValue(node.name, "cmServerPortBase",
                                        True, MASTER_BASEPORT_CMS)
            if self.isSingleInstCluster():
                node.standbyBasePorts[INSTANCE_ROLE_CMSERVER] = \
                    node.masterBasePorts[INSTANCE_ROLE_CMSERVER]

    def get_local_node_info(self):
        """
        Get local node info
        """
        local_node = [node for node in self.dbNodes if node.id == self.localNodeId]
        return local_node[0] if local_node else None

    def __readNodeBasicInfo(self, dbNode, nodenames):
        """
        function : Read basic info of specified node.
        input : []
        output : NA
        """
        # get backIp
        dbNode.backIps = self.compress_ips(self.__readNodeIps(dbNode.name, "backIp"))
        if (len(dbNode.backIps) == 0):
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51207"] % dbNode.name)
        # get sshIp
        dbNode.sshIps = self.compress_ips(self.__readNodeIps(dbNode.name, "sshIp"))
        if (len(dbNode.sshIps) == 0):
            dbNode.sshIps = dbNode.backIps[:]
        # get virtualIp
        dbNode.virtualIp = self.__readVirtualIp(dbNode.name, "virtualIp")
        # get ssh_port
        dbNode.ssh_port = self.__readNodeIntValue(dbNode.name, "sshPort", True, 22)
        # Get cm_server number
        dbNode.cmsNum = self.__readNodeIntValue(dbNode.name, "cmsNum", True, 0)
        # Get gtm number
        dbNode.gtmNum = self.__readNodeIntValue(dbNode.name, "gtmNum", True, 0)
        # Get etcd number
        dbNode.etcdNum = self.__readNodeIntValue(dbNode.name, "etcdNum", True,
                                                 0)
        # Get cn number
        dbNode.cooNum = self.__readNodeIntValue(dbNode.name, "cooNum", True, 0)
        # Get DB number
        dbNode.dataNum = self.__readNodeIntValue(dbNode.name, "dataNum", True,
                                                 0)
        # read cm directory for server and agent
        try:
            dbNode.cmDataDir = self.__readNodeStrValue(dbNode.name, "cmDir")
            self.cm_state_list.append(True)
        except Exception as _:
            self.cm_state_list.append(False)
            if not self.check_conf_cm_state():
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] %
                                ("CM", "has same configure."))
        if self.check_conf_cm_state():
            self.set_cm_info_for_node(dbNode, nodenames)

        # check dataNum
        if dbNode.dataNum < 0:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51208"] % ("dn", dbNode.dataNum))

        # Get base port
        if dbNode.dataNum > 0:
            dbNode.masterBasePorts[INSTANCE_ROLE_DATANODE] = \
                self.__readNodeIntValue(dbNode.name, "dataPortBase",
                                        True, MASTER_BASEPORT_DATA)
            dbNode.standbyBasePorts[INSTANCE_ROLE_DATANODE] = \
                dbNode.masterBasePorts[INSTANCE_ROLE_DATANODE]

        # Get az name
        dbNode.azName = self.__readNodeStrValue(dbNode.name, "azName")
        # check azName
        # Get az Priority
        dbNode.azPriority = self.__readNodeIntValue(dbNode.name, "azPriority",
                                                    True, 0)
        # get cascadeRole
        dbNode.cascadeRole = self.__readNodeStrValue(dbNode.name, "cascadeRole",
                                                     True, "off")
        if (dbNode.azPriority < AZPRIORITY_MIN or
                dbNode.azPriority > AZPRIORITY_MAX):
            raise Exception(ErrorCode.GAUSS_532["GAUSS_53206"] % "azPriority")

        if not dbNode.azName:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51212"] % ("azName"))
        if dbNode.azPriority < 1:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51208"]
                            % ("azPriority", dbNode.azPriority))
        dbNode.grIp1 = self.__readNodeStrValue(dbNode.name, "grIp1")
        dbNode.listen_addr = self.__readNodeStrValue(dbNode.name, "listen_addr")

    def __getCmsCountFromWhichConfiguredNode(self, masterNode):
        """
        function : get the count of cmservers if current node configured
        cmserver
        input : masterNode
        output : cmsCount
        """
        cms_list = self.__readNodeStrValue(masterNode.name, "cmServerRelation",
                                          True, "").split(",")
        cms_count = len(cms_list)
        device_count = len(self.__getAllHostnamesFromDEVICELIST())
        if (cms_count == 0):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                            % ("CMServer configuration on host [%s]"
                               % str(masterNode.name))
                            + " The information of %s is wrong."
                            % "cmServerRelation")

        if cms_count != device_count:
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                            % "CMServer configuration, "
                               "The num of cmServerRelation's hostname is wrong, "
                               "Please check it.")

        for name_node in cms_list:
            name_node = name_node.strip()
            if name_node not in self.__getAllHostnamesFromDEVICELIST():
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                                % ("The information of %s:%s is wrong.")
                                % ("cmServerRelation", name_node))
        return cms_count

    def __readCmsConfig(self, masterNode):
        """
        function : Read cm server config on node.
        input : []
        output : NA
        """
        self.__readCmsConfigForMutilAZ(masterNode)

    def __readCmsConfigForMutilAZ(self, masterNode):
        """
        """
        cmsListenIps = None
        cmsHaIps = None
        if (masterNode.cmsNum > 0):
            self.cmscount = self.__getCmsCountFromWhichConfiguredNode(
                masterNode)
            cmsListenIps = self.__readInstanceIps(masterNode.name,
                                                  "cmServerListenIp",
                                                  self.cmscount)
            cmsHaIps = self.__readInstanceIps(masterNode.name, "cmServerHaIp",
                                              self.cmscount)

        for i in range(masterNode.cmsNum):
            level = self.__readNodeIntValue(masterNode.name, "cmServerlevel")
            if level == "":
                level = self.__readNodeIntValue(masterNode.name, "cmServerLevel")
            hostNames = []
            hostNames_tmp = \
                self.__readNodeStrValue(masterNode.name,
                                        "cmServerRelation").split(",")
            for hostname in hostNames_tmp:
                hostNames.append(hostname.strip())

            instId = self.__assignNewInstanceId(INSTANCE_ROLE_CMSERVER)
            mirrorId = self.__assignNewMirrorId()
            instIndex = i * self.cmscount
            masterNode.appendInstance(instId, mirrorId, INSTANCE_ROLE_CMSERVER,
                                      MASTER_INSTANCE, cmsListenIps[instIndex],
                                      cmsHaIps[instIndex], "", "", level)

            for j in range(1, self.cmscount):
                dbNode = self.getDbNodeByName(hostNames[j])
                if dbNode is None:
                    raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                                    % ("CMServer configuration on host [%s]"
                                       % masterNode.name)
                                    + " There is no host named %s."
                                    % hostNames[j])
                instId = self.__assignNewInstanceId(INSTANCE_ROLE_CMSERVER)
                instIndex += 1
                dbNode.appendInstance(instId, mirrorId, INSTANCE_ROLE_CMSERVER,
                                      STANDBY_INSTANCE,
                                      cmsListenIps[instIndex],
                                      cmsHaIps[instIndex], "", "", level)

    def __getDataNodeCount(self, masterNode):
        """
        function : get the count of data nodes
        input : masterNode
        output : dataNodeCount
        """
        dataNodeList = self.__readNodeStrValue(masterNode.name,
                                               "dataNode1",
                                               True, "").split(",")
        dnListLen = len(dataNodeList)
        dataNodeCount = (dnListLen + 1) // 2
        return dataNodeCount

    def __readDataNodeConfig(self, masterNode):
        """   
        function : Read datanode config on node.
        input : []
        output : NA
        """
        self.__readDataNodeConfigForMutilAZ(masterNode)

    def __readDataNodeConfigForMutilAZ(self, masterNode):
        """
        """
        dnListenIps = None
        dnHaIps = None
        dn_float_ips = None
        mirror_count_data = self.__getDataNodeCount(masterNode)
        if masterNode.dataNum > 0:
            dnListenIps = self.__readInstanceIps(masterNode.name,
                                                 "dataListenIp",
                                                 masterNode.dataNum *
                                                 mirror_count_data)
            dnHaIps = self.__readInstanceIps(masterNode.name, "dataHaIp",
                                             masterNode.dataNum *
                                             mirror_count_data)
            dn_float_ips = self.__readInstanceIps(masterNode.name,
                                                  "floatIpMap",
                                                  masterNode.dataNum *
                                                  mirror_count_data)
        if dn_float_ips is not None:
            self.__read_cluster_float_ips(dn_float_ips)
        dnInfoLists = [[] for row in range(masterNode.dataNum)]
        xlogInfoLists = [[] for row in range(masterNode.dataNum)]
        dcf_data_lists = [[] for row in range(masterNode.dataNum)]
        ssdInfoList = [[] for row in range(masterNode.dataNum)]
        syncNumList = [-1 for row in range(masterNode.dataNum)]
        syncNumFirstList = [[] for row in range(masterNode.dataNum)]
        totalDnInstanceNum = 0
        # Whether the primary and standby have SET XLOG PATH , must be
        # synchronized
        has_xlog_path = 0
        for i in range(masterNode.dataNum):
            dnInfoList = []
            key = "dataNode%d" % (i + 1)
            dnInfoList_tmp = self.__readNodeStrValue(masterNode.name,
                                                     key).split(",")
            for dnInfo in dnInfoList_tmp:
                dnInfoList.append(dnInfo.strip())
            dnInfoListLen = len(dnInfoList)
            if dnInfoListLen != 2 * mirror_count_data - 1:
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                                ("database node configuration on host [%s]"
                                 % masterNode.name)
                                + " The information of [%s] is wrong." % key)
            totalDnInstanceNum += (dnInfoListLen + 1) // 2
            dnInfoLists[i].extend(dnInfoList)

            # If not set dataNodeXlogPath in xmlfile,just set
            # xlogInfoListLen = 0,Used for judgement.
            # If set dataNodeXlogPath in xmlfile,each datanode needs to have
            # a corresponding xlogdir.
            xlogInfoList = []
            xlogkey = "dataNodeXlogPath%d" % (i + 1)
            xlogInfoList_tmp = self.__readNodeStrValue(masterNode.name,
                                                       xlogkey).split(",")
            for xlogInfo in xlogInfoList_tmp:
                xlogInfoList.append(xlogInfo.strip())

            # This judgment is necessary,if not set dataNodeXlogPath,
            # xlogInfoListLen will equal 1.
            # Because dninfolist must be set, it does not need extra judgment.
            if xlogInfoList_tmp == ['']:
                xlogInfoListLen = 0
            else:
                xlogInfoListLen = len(xlogInfoList)

            if i == 0:
                has_xlog_path = xlogInfoListLen

            if xlogInfoListLen != has_xlog_path:
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                                ("database node configuration on host [%s]"
                                 % masterNode.name)
                                + " The information of [%s] is wrong."
                                % xlogkey)

            if (xlogInfoListLen != 0 and xlogInfoListLen != (dnInfoListLen + 1) // 2):
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                                ("database node configuration on host [%s]"
                                 % masterNode.name)
                                + " The information of [%s] is wrong."
                                % xlogkey)
            xlogInfoLists[i].extend(xlogInfoList)
            dcf_data_list = []
            if self.enable_dcf == "on":
                if self.cmscount < 3:
                    raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] +
                                    "At least three cm_server instances are required.")
                for dcf_info in range(0, mirror_count_data * 2, 2):
                    dcf_data_list.append(dnInfoList_tmp[dcf_info] + '/dcf_data')
                dcf_data_lists[i].extend(dcf_data_list)
            else:
                dcf_data_list = ['' for i in range(mirror_count_data)]

            key = "ssdDNDir%d" % (i + 1)
            # ssd doesn't supply ,so set ssddir value to empty
            ssddirList = []
            if self.enable_dcf == "":
                i = 0
            ssdInfoList[i].extend(ssddirList)

            # dataNode syncNum
            key = "dataNode%d_syncNum" % (i + 1)
            syncNum_temp = self.__readNodeStrValue(masterNode.name, key)
            if syncNum_temp is not None and syncNum_temp != "":
                syncNum = int(syncNum_temp)
                if syncNum < 0 or syncNum >= totalDnInstanceNum:
                    raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                                    ("database node configuration on host [%s]"
                                    % masterNode.name)
                                    + " The information of [%s] is wrong."
                                    % key)
                syncNumList[i] = syncNum

        # check ip num
        if dnListenIps is not None and len(dnListenIps[0]) != 0:
            colNum = len(dnListenIps[0])
            rowNum = len(dnListenIps)
            for col in range(colNum):
                ipNum = 0
                for row in range(rowNum):
                    if dnListenIps[row][col] != "":
                        ipNum += 1
                    else:
                        break
                if ipNum != totalDnInstanceNum:
                    raise Exception(ErrorCode.GAUSS_516["GAUSS_51637"] % \
                                    ("IP number of dataListenIp",
                                     "instance number"))

        if dnHaIps is not None and len(dnHaIps[0]) != 0:
            colNum = len(dnHaIps[0])
            rowNum = len(dnHaIps)
            for col in range(colNum):
                ipNum = 0
                for row in range(rowNum):
                    if dnHaIps[row][col] != "":
                        ipNum += 1
                    else:
                        break
                if ipNum != totalDnInstanceNum:
                    raise Exception(ErrorCode.GAUSS_516["GAUSS_51637"] % \
                                    ("IP number of dataHaIps",
                                     "instance number"))

        instIndex = 0
        for i in range(masterNode.dataNum):
            dnInfoList = dnInfoLists[i]
            key = "syncNode_%s" % (masterNode.name)
            if self.__readNodeStrValue(masterNode.name, key) is not None:
                syncNumFirst_temp = self.__readNodeStrValue(masterNode.name, key)
                if syncNumFirst_temp is not None:
                    syncNumFirst = syncNumFirst_temp
                syncNumFirstList[i] = syncNumFirst

            # Because xlog may not be set to prevent the array from crossing
            # the boundary
            if xlogInfoListLen != 0:
                xlogInfoList = xlogInfoLists[i]
            groupId = self.__assignNewGroupId()
            if len(ssdInfoList[i]) > 1:
                ssddirList = ssdInfoList[i]
            # master datanode
            instId = self.__assignNewInstanceId(INSTANCE_ROLE_DATANODE)
            # ssd doesn't supply ,this branch will not arrive when len(
            # ssdInfoList[i])  is 0
            if len(ssdInfoList[i]) > 1:
                if xlogInfoListLen == 0:
                    if self.enable_dcf == "on":
                        masterNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              MASTER_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[0], ssddirList[0],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i],
                                              dcf_data=dcf_data_list[0])
                    else:
                        masterNode.appendInstance(instId, groupId,
                                                  INSTANCE_ROLE_DATANODE,
                                                  MASTER_INSTANCE,
                                                  dnListenIps[instIndex],
                                                  dnHaIps[instIndex],
                                                  dnInfoList[0], ssddirList[0],
                                                  syncNum=syncNumList[i],
                                                  syncNumFirst=syncNumFirstList[i])
                else:
                    masterNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              MASTER_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[0], ssddirList[0],
                                              xlogdir=xlogInfoList[0],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i])
            else:
                if xlogInfoListLen == 0:
                    if self.enable_dcf == "on":
                        masterNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              MASTER_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[0],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i],
                                              dcf_data=dcf_data_list[0],
                                              float_ips=dn_float_ips[instIndex] \
                                              if dn_float_ips else [])
                    else:
                        masterNode.appendInstance(instId, groupId,
                                                  INSTANCE_ROLE_DATANODE,
                                                  MASTER_INSTANCE,
                                                  dnListenIps[instIndex],
                                                  dnHaIps[instIndex],
                                                  dnInfoList[0],
                                                  syncNum=syncNumList[i],
                                                  syncNumFirst=syncNumFirstList[i],
                                                  float_ips=dn_float_ips[instIndex] \
                                                  if dn_float_ips else [])
                else:
                    masterNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              MASTER_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[0],
                                              xlogdir=xlogInfoList[0],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i],
                                              float_ips=dn_float_ips[instIndex] \
                                              if dn_float_ips else [])

            instIndex += 1

            for nodeLen in range((len(dnInfoList) + 1) // 2 - 1):
                dbNode = self.getDbNodeByName(dnInfoList[nodeLen * 2 + 1])
                if dbNode is None:
                    raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                                    % ("database node configuration on "
                                       "host [%s]" % str(masterNode.name))
                                    + " There is no host named %s."
                                    % dnInfoList[nodeLen * 2 + 1])
                instId = self.__assignNewInstanceId(INSTANCE_ROLE_DATANODE)
                
                syncNumFirstList[i] = ""
                key = "syncNode_%s" % (dbNode.name)
                if self.__readNodeStrValue(dbNode.name, key) is not None:
                    syncNumFirst_temp = self.__readNodeStrValue(dbNode.name, key)
                    if syncNumFirst_temp is not None:
                        syncNumFirst = syncNumFirst_temp
                    syncNumFirstList[i] = syncNumFirst

                # ssd doesn't supply ,this branch will not arrive when len(
                # ssdInfoList[i])  is 0
                if len(ssdInfoList[i]) > 1:
                    if xlogInfoListLen == 0:
                        if self.enable_dcf == "on":
                            dbNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              STANDBY_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[nodeLen * 2 + 2],
                                              ssddirList[nodeLen * 2 + 1],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i],
                                              dcf_data=dcf_data_list[0])
                        else:
                            dbNode.appendInstance(instId, groupId,
                                                  INSTANCE_ROLE_DATANODE,
                                                  STANDBY_INSTANCE,
                                                  dnListenIps[instIndex],
                                                  dnHaIps[instIndex],
                                                  dnInfoList[nodeLen * 2 + 2],
                                                  ssddirList[nodeLen * 2 + 1],
                                                  syncNum=syncNumList[i],
                                                  syncNumFirst=syncNumFirstList[i])
                    else:
                        if self.enable_dcf == "on":
                            dbNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              STANDBY_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[nodeLen * 2 + 2],
                                              ssddirList[nodeLen * 2 + 1],
                                              xlogdir=xlogInfoList[nodeLen + 1],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i],
                                              dcf_data=dcf_data_list[0])
                        else:
                            dbNode.appendInstance(instId, groupId,
                                                  INSTANCE_ROLE_DATANODE,
                                                  STANDBY_INSTANCE,
                                                  dnListenIps[instIndex],
                                                  dnHaIps[instIndex],
                                                  dnInfoList[nodeLen * 2 + 2],
                                                  ssddirList[nodeLen * 2 + 1],
                                                  xlogdir=xlogInfoList[nodeLen + 1],
                                                  syncNum=syncNumList[i],
                                                  syncNumFirst=syncNumFirstList[i])
                else:
                    if xlogInfoListLen == 0:
                        if self.enable_dcf == "on":
                            dbNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              STANDBY_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[nodeLen * 2 + 2],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i],
                                              dcf_data=dcf_data_list[0],
                                              float_ips=dn_float_ips[instIndex] \
                                              if dn_float_ips else [])
                        else:
                            dbNode.appendInstance(instId, groupId,
                                                  INSTANCE_ROLE_DATANODE,
                                                  STANDBY_INSTANCE,
                                                  dnListenIps[instIndex],
                                                  dnHaIps[instIndex],
                                                  dnInfoList[nodeLen * 2 + 2],
                                                  syncNum=syncNumList[i],
                                                  syncNumFirst=syncNumFirstList[i],
                                                  float_ips=dn_float_ips[instIndex] \
                                                  if dn_float_ips else [])
                    else:
                        if self.enable_dcf == "on":
                            dbNode.appendInstance(instId, groupId,
                                              INSTANCE_ROLE_DATANODE,
                                              STANDBY_INSTANCE,
                                              dnListenIps[instIndex],
                                              dnHaIps[instIndex],
                                              dnInfoList[nodeLen * 2 + 2],
                                              xlogdir=xlogInfoList[nodeLen + 1],
                                              syncNum=syncNumList[i],
                                              syncNumFirst=syncNumFirstList[i],
                                              dcf_data=dcf_data_list[0],
                                              float_ips=dn_float_ips[instIndex] \
                                              if dn_float_ips else [])
                        else:
                            dbNode.appendInstance(instId, groupId,
                                                  INSTANCE_ROLE_DATANODE,
                                                  STANDBY_INSTANCE,
                                                  dnListenIps[instIndex],
                                                  dnHaIps[instIndex],
                                                  dnInfoList[nodeLen * 2 + 2],
                                                  xlogdir=xlogInfoList[nodeLen + 1],
                                                  syncNum=syncNumList[i],
                                                  syncNumFirst=syncNumFirstList[i],
                                                  float_ips=dn_float_ips[instIndex] \
                                                  if dn_float_ips else [])
                if dbNode.cascadeRole == "on":
                    if self.enable_dcf != "on":
                        for inst in dbNode.datanodes:
                            inst.instanceType = CASCADE_STANDBY
                    else:
                        raise Exception(ErrorCode.GAUSS_512["GAUSS_51244"] %
                                        "In DCF mode cascadeRole")
                instIndex += 1

        for inst in masterNode.datanodes:
            inst.azName = masterNode.azName

    @staticmethod
    def append_map_ip_into_global(strem_ip_map):
        """append_map_ip_into_global"""
        shard_map = []
        ip_map_list = [i.strip().strip("),").strip(",(") for i in strem_ip_map.split("(") if i]
        for ip_map in ip_map_list:
            peer_ip_map = ip_map.split(",")
            temp_dict = dict()
            if len(peer_ip_map) != 2:
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] +
                                " check localStreamIpmap is correct")
            temp_dict["ip"] = peer_ip_map[0].strip()
            SecurityChecker.check_ip_valid(temp_dict["ip"],  temp_dict["ip"])
            temp_dict["dataIp"] = peer_ip_map[1].strip()
            SecurityChecker.check_ip_valid(temp_dict["dataIp"],  temp_dict["dataIp"])
            shard_map.append(temp_dict)
        return shard_map

    def __readCmaConfig(self, dbNode):
        """ 
        function : Read cm agent config on node.
        input : []
        output : NA
        """
        agentIps = self.__readInstanceIps(dbNode.name, "cmAgentConnectIp", 1)
        instId = self.__assignNewInstanceId(INSTANCE_ROLE_CMAGENT)
        dbNode.appendInstance(instId, MIRROR_ID_AGENT, INSTANCE_ROLE_CMAGENT,
                              INSTANCE_TYPE_UNDEFINED, agentIps[0], None, "")

    def __readGrConfig(self, dbNode):
        """
        function : Read oGRecorder config on node.
        input : []
        output : NA
        """
        # dbNode.grIp1 = self.compress_ips(self.__readNodeIps(dbNode.name, "grIp1"))
        # dbNode.listen_addr = self.compress_ips(self.__readNodeIps(dbNode.name, "listen_addr"))

    def __assignNewInstanceId(self, instRole):
        """
        function : Assign a new id for instance.
        input : String
        output : NA
        """
        newId = self.__newInstanceId[instRole]
        if (INSTANCE_ROLE_DATANODE == instRole):
            if (newId == OLD_LAST_PRIMARYSTANDBY_BASEID_NUM):
                self.__newInstanceId[instRole] = \
                    self.__newInstanceId[instRole] + 1 + \
                    (NEW_FIRST_PRIMARYSTANDBY_BASEID_NUM
                     - OLD_LAST_PRIMARYSTANDBY_BASEID_NUM)
            else:
                self.__newInstanceId[instRole] += 1
        else:
            self.__newInstanceId[instRole] += 1
        return newId

    def __assignNewMirrorId(self):
        """   
        function : Assign a new mirror id.
        input : NA
        output : NA
        """
        self.__newMirrorId += 1

        return self.__newMirrorId

    def __assignNewGroupId(self):
        """"""
        self.__newGroupId += 1
        return self.__newGroupId

    def __readNodeIps(self, nodeName, prefix):
        """  
        function : Read ip for node, such as backIp1, sshIp1 etc..
        input : String,String
        output : NA
        """
        ipList = []
        n = 1

        if (prefix == "cooListenIp"):
            n = 3
        elif (prefix == "etcdListenIp"):
            n = 2

        for i in range(1, CONFIG_IP_NUM + n):
            key = "%s%d" % (prefix, i)
            value = self.__readNodeStrValue(nodeName, key, True, "")
            if (value == ""):
                break
            ipList.append(value)

        return ipList

    def __readVirtualIp(self, nodeName, prefix):
        """
        function : Read  virtual ip only for node.
        input : String,String
        output : NA
        """
        ipList = []
        value = self.__readNodeStrValue(nodeName, prefix, True, "")
        if (value != ""):
            valueIps = value.split(",")
            for ip in valueIps:
                ip = ip.strip()
                if ip not in ipList:
                    ipList.append(ip)
        return self.compress_ips(ipList)

    def __isIpValid(self, ip):
        """  
        function : check if the input ip address is valid
        input : String
        output : NA
        """
        try:
            ipaddress.ip_address(ip)
            return True
        except ValueError:
            return False

    def __isPortValid(self, port):
        """   
        function :Judge if the port is valid
        input : int
        output : boolean
        """
        if (port < 0 or port > 65535):
            return False
        elif (port >= 0 and port <= 1023):
            return False
        else:
            return True

    def __readInstanceIps(self, nodeName, prefix, InstCount):
        """  
        function :Read instance ips
        input : String,String,int
        output : NA
        """
        multiIpList = self.__readNodeIps(nodeName, prefix)

        mutilIpCount = len(multiIpList)
        if (mutilIpCount == 0):
            return [[] for row in range(InstCount)]

        instanceIpList = [["" for col in range(mutilIpCount)] for row in
                          range(InstCount)]
        for i in range(mutilIpCount):
            ipList = []
            ipList_tmp = multiIpList[i].split(",")
            for ip in ipList_tmp:
                ipList.append(ip.strip())
            if prefix != "floatIpMap":
                ipList = self.compress_ips(ipList)
            ipNum = len(ipList)
            if (ipNum != InstCount):
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"]
                                % ("[%s] of node [%s]" % (prefix, nodeName))
                                + " The count of IP is wrong.")
            for j in range(ipNum):
                instanceIpList[j][i] = ipList[j]

        return instanceIpList

    def __readNodeIntValue(self, nodeName, key, nullable=False, defValue=0):
        """
        function :Read integer value of specified node
        input : String,int
        output : NA
        """
        value = defValue
        strValue = self.__readNodeStrValue(nodeName, key, nullable, "")
        if strValue is None or strValue == "":
            return value
        try:
            value = int(strValue)
        except Exception:
            value = defValue
        return value

    def __readNodeStrValue(self, nodeName, key, nullable=False, defValue=""):
        """
        function : Read string of specified node
        input : String,int
        output : defValue
        """
        (retStatus, retValue) = self.readOneClusterConfigItem(xmlRootNode, key, "node", nodeName)
        if retStatus == 0:
            return str(retValue).strip()
        elif retStatus == 2 and nullable:
            return defValue
        elif retStatus == 2 and ("dataNodeXlogPath" in key or "syncNum" in key or "syncNode" in key or "cmServerlevel" == key):
            return defValue
        else:
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                            ("[%s] of node [%s]" % (key, nodeName)) + \
                            " Return status: %d. value: %s. Check whether "
                            "the dataNum is correct first."
                            % (retStatus, retValue))

    def __checkAZForSingleInst(self):
        """
        function : check az names and DB replication
        input : NA
        output : NA
        """

        # Get DB standys num
        # The number of standbys for each DB instance must be the same
        peerNum = 0
        for dbNode in self.dbNodes:
            for inst in dbNode.datanodes:
                if (inst.instanceType == MASTER_INSTANCE):
                    peerInsts = self.getPeerInstance(inst)
                    if (peerNum == 0):
                        peerNum = len(peerInsts)
                    elif (peerNum != len(peerInsts)):
                        raise Exception(ErrorCode.GAUSS_532["GAUSS_53200"])

        if peerNum > 8:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] % (
                "database node standbys", "be less than 9") + " Please set it.")

    def __getDNPeerInstance(self, dbInst):
        """  
        function : Get DB peer instance of specified instance when write
        static configuration file.
        input : []
        output : []
        """
        instances = []
        instIdLst = []

        for dbNode in self.dbNodes:
            for inst in dbNode.datanodes:
                if (inst.mirrorId == dbInst.mirrorId and inst.instanceId !=
                        dbInst.instanceId):
                    instances.append(inst)
                    instIdLst.append(inst.instanceId)

        # In a primary multi-standby cluster,
        # since the CM update system table depends on the DB read/write
        # sequence in the static configuration file,
        # we must sort the DN's standby list by instanceId.
        if dbInst.instanceType == MASTER_INSTANCE:
            instIdLst.sort()
            instanceLst = []
            for instId in instIdLst:
                for inst in instances:
                    if (inst.instanceId == instId):
                        instanceLst.append(inst)
            return instanceLst
        else:
            return instances

    def saveToStaticConfig(self, filePath, localNodeId, dbNodes=None,
                           upgrade=False):
        """ 
        function : Save cluster info into to static config 
        input : String,int
        output : NA
        """
        fp = None
        number = None
        if not self.dbNodes and dbNodes:
            self.dbNodes = dbNodes
        if upgrade:
            staticConfigFilePath = os.path.split(filePath)[0]
            versionFile = os.path.join(
                staticConfigFilePath, "upgrade_version")
            version, number, commitid = VersionInfo.get_version_info(
                versionFile)
        try:
            if (dbNodes is None):
                dbNodes = self.dbNodes
            createFileInSafeMode(filePath)
            fp = open(filePath, "wb")
            # len
            info = struct.pack("I", 28)
            # version
            info += struct.pack("I", BIN_CONFIG_VERSION_SINGLE_INST)
            # time
            info += struct.pack("q", int(time.time()))
            # node count
            info += struct.pack("I", len(dbNodes))
            # local node
            info += struct.pack("I", localNodeId)

            crc = binascii.crc32(info)
            if upgrade:
                if float(number) <= 92.200:
                    info = struct.pack("q", crc) + info
                else:
                    info = struct.pack("I", crc) + info
            else:
                info = struct.pack("I", crc) + info
            fp.write(info)

            for dbNode in dbNodes:
                offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
                fp.seek(offset)

                info = self.__packNodeInfo(dbNode, number, upgrade=upgrade)
                fp.write(info)
            endBytes = PAGE_SIZE - fp.tell() % PAGE_SIZE
            if (endBytes != PAGE_SIZE):
                info = struct.pack("%dx" % endBytes)
                fp.write(info)
            fp.flush()
            fp.close()
            os.chmod(filePath, DIRECTORY_PERMISSION)
        except Exception as e:
            if fp:
                fp.close()
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50205"] % \
                            "static configuration file"
                            + " Error: \n%s" % str(e))

    def __packNodeInfo(self, dbNode, number, upgrade=False):
        """ 
        function : Pack the info of node 
        input : []
        output : String
        """
        # node id 
        info = struct.pack("I", dbNode.id)
        # node name
        info += struct.pack("64s", dbNode.name.encode("utf-8"))
        # az info
        info += struct.pack("64s", dbNode.azName.encode("utf-8"))
        info += struct.pack("I", dbNode.azPriority)
        # backIp
        info += self.__packIps(dbNode.backIps)
        # sshIp
        info += self.__packIps(dbNode.sshIps)
        # cm_server
        info += self.__packCmsInfo(dbNode)
        # cm_agent
        info += self.__packAgentInfo(dbNode)
        # gtm
        info += self.__packGtmInfo(dbNode)
        # cancel save gtmProxy info,need a placeholder
        info += self.__packGtmProxyInfo(dbNode)
        # cn
        info += self.__packCooInfo(dbNode)
        # dn
        info += self.__packDataNode(dbNode)
        # etcd
        info += self.__packEtcdInfo(dbNode)
        # cancel save sctp begin/end port,need a placeholder
        info += struct.pack("I", 0)
        info += struct.pack("I", 0)
        crc = binascii.crc32(info)

        if upgrade:
            if float(number) <= 92.200:
                return struct.pack("q", crc) + info
            else:
                return struct.pack("I", crc) + info
        else:
            return struct.pack("I", crc) + info

    def __packEtcdInfo(self, dbNode):
        """  
        function : Pack the info of etcd
        input : []
        output : String
        """
        n = len(dbNode.etcds)

        info = "".encode()
        if (n == 0):
            # etcd count
            info += struct.pack("I", 0)
            # etcd id
            info += struct.pack("I", 0)
            # etcd mirror id
            info += struct.pack("i", 0)
            # etcd name
            info += struct.pack("64x")
            # datadir
            info += struct.pack("1024x")
            # listen ip
            info += self.__packIps([])
            # listn port
            info += struct.pack("I", 0)
            # ha ip
            info += self.__packIps([])
            # ha port
            info += struct.pack("I", 0)
        elif (n == 1):
            etcdInst = dbNode.etcds[0]
            # etcd count
            info += struct.pack("I", 1)
            # etcd id
            info += struct.pack("I", etcdInst.instanceId)
            # etcd mirror id
            info += struct.pack("i", etcdInst.mirrorId)
            # etcd name
            info += struct.pack("64s", "etcd_%d".encode(
                "utf-8") % etcdInst.instanceId)
            # datadir
            info += struct.pack("1024s", etcdInst.datadir.encode("utf-8"))
            # listen ip
            info += self.__packIps(etcdInst.listenIps)
            # listn port
            info += struct.pack("I", etcdInst.port)
            # ha ip
            info += self.__packIps(etcdInst.haIps)
            # ha port
            info += struct.pack("I", etcdInst.haPort)
        else:
            pass

        return info

    def __packCmsInfo(self, dbNode):
        """ 
        function : Pack the info of cm server
        input : []
        output : String
        """
        n = len(dbNode.cmservers)

        info = "".encode()
        if (n == 0):
            # cm server id
            info += struct.pack("I", 0)
            # cm_server mirror id
            info += struct.pack("I", 0)
            # datadir
            info += struct.pack("1024s", dbNode.cmDataDir.encode("utf-8"))
            # cm server level
            info += struct.pack("I", 0)
            # float ip
            info += struct.pack("128x")
            # listen ip
            info += self.__packIps([])
            # listen port
            info += struct.pack("I", 0)
            # local ha ip
            info += self.__packIps([])
            # local ha port
            info += struct.pack("I", 0)
            # is primary
            info += struct.pack("I", 0)
            # peer ha ip
            info += self.__packIps([])
            # peer ha port
            info += struct.pack("I", 0)
        elif (n == 1):
            cmsInst = dbNode.cmservers[0]
            # cm server id
            info += struct.pack("I", cmsInst.instanceId)
            # cm_server mirror id
            info += struct.pack("I", cmsInst.mirrorId)
            # datadir
            info += struct.pack("1024s", dbNode.cmDataDir.encode("utf-8"))
            # cm server level
            info += struct.pack("I", cmsInst.level)
            info += struct.pack("128s", self.cmsFloatIp.encode("utf-8"))
            # listen ip
            info += self.__packIps(cmsInst.listenIps)
            # listen port
            info += struct.pack("I", cmsInst.port)
            # local ha ip
            info += self.__packIps(cmsInst.haIps)
            # local ha port
            info += struct.pack("I", cmsInst.haPort)
            # instance type
            info += struct.pack("I", cmsInst.instanceType)
            instances = self.getPeerInstance(cmsInst)
            peerInst = instances[0]
            # peer ha ip
            info += self.__packIps(peerInst.haIps)
            # peer ha port
            info += struct.pack("I", peerInst.haPort)
        else:
            pass

        return info

    def __packAgentInfo(self, dbNode):
        """ 
        function : Pack the info of agent
        input : []
        output : String
        """
        n = len(dbNode.cmagents)

        info = "".encode()
        if (n == 1):
            cmaInst = dbNode.cmagents[0]
            # Agent id
            info += struct.pack("I", cmaInst.instanceId)
            # Agent mirror id
            info += struct.pack("i", cmaInst.mirrorId)
            # agent ips
            info += self.__packIps(cmaInst.listenIps)

        return info

    def __packGtmInfo(self, dbNode):
        """ 
        function : Pack the info of gtm
        input : []
        output : String
        """
        n = len(dbNode.gtms)

        info = "".encode()
        if (n == 0):
            # gtm id
            info += struct.pack("I", 0)
            # gtm mirror id
            info += struct.pack("I", 0)
            # gtm count
            info += struct.pack("I", 0)
            # datadir
            info += struct.pack("1024x")
            # listen ip
            info += self.__packIps([])
            # listn port
            info += struct.pack("I", 0)
            #  instance type
            info += struct.pack("I", 0)
            # loacl ha ip
            info += self.__packIps([])
            # local ha port
            info += struct.pack("I", 0)
            # peer gtm datadir
            info += struct.pack("1024x")
            # peer ha ip
            info += self.__packIps([])
            # peer ha port
            info += struct.pack("I", 0)
        elif (n == 1):
            gtmInst = dbNode.gtms[0]
            # gtm id
            info += struct.pack("I", gtmInst.instanceId)
            # gtm mirror id
            info += struct.pack("I", gtmInst.mirrorId)
            # gtm count
            info += struct.pack("I", 1)
            # datadir
            info += struct.pack("1024s", gtmInst.datadir.encode("utf-8"))
            # listen ip
            info += self.__packIps(gtmInst.listenIps)
            # listn port
            info += struct.pack("I", gtmInst.port)
            #  instance type
            info += struct.pack("I", gtmInst.instanceType)
            # loacl ha ip
            info += self.__packIps(gtmInst.haIps)
            # local ha port
            info += struct.pack("I", gtmInst.haPort)
            # peer gtm datadir
            info += struct.pack("1024x")
            # peer ha ip
            info += self.__packIps([])
            # peer ha port
            info += struct.pack("I", 0)

        else:
            pass

        return info

    def __packGtmProxyInfo(self, dbNode):
        """  
        function : Pack the info of gtm proxy
        input : []
        output : String
        """
        info = "".encode()
        info += struct.pack("I", 0)
        info += struct.pack("I", 0)
        info += struct.pack("I", 0)
        info += self.__packIps([])
        info += struct.pack("I", 0)
        return info

    def __packCooInfo(self, dbNode):
        """  
        function : Pack the info of coordinator
        input : []
        output : String
        """
        n = len(dbNode.coordinators)

        info = "".encode()
        if (n == 0):
            # coordinator id
            info += struct.pack("I", 0)
            # coordinator mirror id
            info += struct.pack("i", 0)
            # coordinator count
            info += struct.pack("I", 0)
            # datadir
            info += struct.pack("1024x")
            # ssdDir
            info += struct.pack("1024x")
            # listen ip
            info += self.__packIps([])
            # listn port 
            info += struct.pack("I", 0)
            # ha port
            info += struct.pack("I", 0)
        elif (n == 1):
            cooInst = dbNode.coordinators[0]
            # coordinator id
            info += struct.pack("I", cooInst.instanceId)
            # coordinator mirror id
            info += struct.pack("i", cooInst.mirrorId)
            # coordinator count
            info += struct.pack("I", 1)
            # datadir
            info += struct.pack("1024s", cooInst.datadir.encode("utf-8"))
            # ssdDir
            info += struct.pack("1024s", cooInst.ssdDir.encode("utf-8"))
            # listen ip
            info += self.__packIps(cooInst.listenIps)
            # listn port
            info += struct.pack("I", cooInst.port)
            # ha port
            info += struct.pack("I", cooInst.haPort)
        else:
            pass

        return info

    def __packDataNode(self, dbNode):
        """   
        function : Pack the info of datanode
        input : []
        output : String
        """

        info = struct.pack("I", len(dbNode.datanodes))
        for dnInst in dbNode.datanodes:
            instances = self.__getDNPeerInstance(dnInst)
            # datanode id
            info += struct.pack("I", dnInst.instanceId)
            # datanode id
            info += struct.pack("I", dnInst.mirrorId)
            # datadir
            info += struct.pack("1024s", dnInst.datadir.encode("utf-8"))
            # xlogdir
            info += struct.pack("1024s", dnInst.xlogdir.encode("utf-8"))
            # ssdDir
            info += struct.pack("1024s", dnInst.ssdDir.encode("utf-8"))
            # listen ip
            info += self.__packIps(dnInst.listenIps)
            # port
            info += struct.pack("I", dnInst.port)
            # instance type
            info += struct.pack("I", dnInst.instanceType)
            # loacl ha ip
            info += self.__packIps(dnInst.haIps)
            # local ha port
            info += struct.pack("I", dnInst.haPort)

            maxStandbyCount = MIRROR_COUNT_REPLICATION_MAX - 1

            n = len(instances)
            for i in range(n):
                peerInst = instances[i]
                # peer1 datadir
                info += struct.pack("1024s", peerInst.datadir.encode("utf-8"))
                # peer1 ha ip
                info += self.__packIps(peerInst.haIps)
                # peer1 ha port
                info += struct.pack("I", peerInst.haPort)
                # instance type
                info += struct.pack("I", peerInst.instanceType)
            for i in range(n, maxStandbyCount):
                # peer1 datadir
                info += struct.pack("1024x")
                # peer1 ha ip
                info += self.__packIps([])
                # peer1 ha port
                info += struct.pack("I", 0)
                # instance type
                info += struct.pack("I", 0)
        return info

    def __packIps(self, ips):
        """
        function : Pack the info of ips
        input : []
        output : String
        """
        n = len(ips)

        info = struct.pack("I", n)
        for i in range(n):
            info += struct.pack("128s", ips[i].encode("utf-8"))
        for i in range(n, MAX_IP_NUM):
            info += struct.pack("128x")

        return info

    def isSingleInstCluster(self):
        return self.clusterType == CLUSTER_TYPE_SINGLE_INST

    def isSingleNode(self):
        return (self.__getDnInstanceNum() <= 1)

    def doRefreshConf(self, user, localHostName, sshtool, logger=None):
        self.__createDynamicConfig(user, localHostName, sshtool, logger)
        self.__create_simple_datanode_config(user, localHostName, sshtool)
        self.__reset_replconninfo(user, sshtool)

    def __createDynamicConfig(self, user, localHostName, sshtool, logger=None):
        """
        function : Save cluster info into to dynamic config
        input : String,int
        output : NA
        """
        # only one dn, no need to write primary or stanby node info
        dynamicConfigFile = self.__getDynamicConfig(user)
        if os.path.exists(dynamicConfigFile):
            cmd = "rm -f %s" % dynamicConfigFile
            (status, output) = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(ErrorCode.GAUSS_504["GAUSS_50407"] +
                                " Error: \n%s." % str(output) +
                                "The cmd is %s" % cmd)
        fp = None
        try:
            FileUtil.createFileInSafeMode(dynamicConfigFile)
            fp = open(dynamicConfigFile, "wb")
            # len
            info = struct.pack("I", 24)
            # version
            info += struct.pack("I", BIN_CONFIG_VERSION_SINGLE_INST)
            # time
            info += struct.pack("q", int(time.time()))
            # node count
            info += struct.pack("I", len(self.dbNodes))
            crc = binascii.crc32(info)
            info = struct.pack("I", crc) + info
            fp.write(info)
            primaryDnNum = 0
            for dbNode in self.dbNodes:
                offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
                fp.seek(offset)
                (primaryNodeNum, info) = self.__packDynamicNodeInfo(
                    dbNode, localHostName, sshtool, logger)
                primaryDnNum += primaryNodeNum
                fp.write(info)
            if primaryDnNum != 1:
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] %
                                ("master dn", "equal to 1"))
            endBytes = PAGE_SIZE - fp.tell() % PAGE_SIZE
            if endBytes != PAGE_SIZE:
                info = struct.pack("%dx" % endBytes)
                fp.write(info)
            fp.flush()
            fp.close()
            os.chmod(dynamicConfigFile, ConstantsBase.KEY_FILE_PERMISSION)
        except Exception as e:
            if fp:
                fp.close()
            cmd = "rm -f %s" % dynamicConfigFile
            subprocess.getstatusoutput(cmd)
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50205"] % \
                            "dynamic configuration file"
                            + " Error: \n%s" % str(e))
        try:
            self.__sendDynamicCfgToAllNodes(localHostName,
                                            dynamicConfigFile,
                                            dynamicConfigFile)
        except Exception as e:
            cmd = "rm -f %s" % dynamicConfigFile
            sshtool.getSshStatusOutput(cmd, self.getClusterNodeNames())
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50205"] % \
                            "dynamic configuration file" +
                            " Error: \n%s" % str(e))

    def __create_simple_datanode_config(self, user, localhostname, sshtool):
        simpleDNConfig = self.__getDynamicSimpleDNConfig(user)
        if os.path.exists(simpleDNConfig):
            cmd = "rm -f %s" % simpleDNConfig
            (status, output) = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(ErrorCode.GAUSS_504["GAUSS_50407"] +
                                " Error: \n%s." % str(output) +
                                "The cmd is %s" % cmd)
        output_list = self.__getStatusByOM(user)
        output_num = 0
        # The purpose of this regular expression is to match text lines containing IPv4 or IPv6 addresses.
        pattern = re.compile(r'(\d+) (.*) ((?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:[0-9a-fA-F]{0,4}(?::[0-9a-fA-F]{0,4})*::?(?:[0-9a-fA-F]{0,4})?)) (.*)')
        if not self.hasNoCm():
            output_list = [i for i in output_list if i]
            output_list = output_list[-1].split('|')
        for contont in output_list:
            if pattern.search(contont):
                output_num += 1
        tempstatus = output_list[-output_num:]
        statusdic = {'Primary': 0, 'Standby': 1, 'Cascade': 3, 'Unknown': 9}
        try:
            with open(simpleDNConfig, "w") as fp:
                for dninfo in tempstatus:
                    dnstatus = dninfo.split()[7]
                    dnname = dninfo.split()[1]
                    if dnstatus not in statusdic:
                        fp.write("%s=%d\n" %
                                 (dnname, statusdic['Unknown']))
                    else:
                        fp.write("%s=%d\n" %
                                 (dnname, statusdic[dnstatus]))
        except Exception as e:
            cmd = "rm -f %s" % simpleDNConfig
            subprocess.getstatusoutput(cmd)
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50205"] %
                            "dynamic configuration file"
                            + " Error: \n%s" % str(e))
        try:
            self.__sendDynamicCfgToAllNodes(localhostname,
                                            simpleDNConfig,
                                            simpleDNConfig)
        except Exception as e:
            cmd = "rm -f %s" % simpleDNConfig
            sshtool.getSshStatusOutput(cmd, self.getClusterNodeNames())
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50205"] %
                            "dynamic configuration file" +
                            " Error: \n%s" % str(e))

    def __reset_replconninfo(self, user, sshtool):
        # add for cascade
        local_script = os.path.dirname(os.path.realpath(__file__)) \
                       + '/../../local/Resetreplconninfo.py'
        cmd = "python3 %s -U %s -t reset" % (local_script, user)
        sshtool.setTimeOut(120)
        for node in self.getClusterNodeNames():
            (status, output) = sshtool.getSshStatusOutput(cmd, [node])
            if status[node] != 'Success':
                raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"]
                                % cmd + "Error:\n%s" % output)

    def __packDynamicNodeInfo(self, dbNode, localHostName, sshtool, logger=None):
        # node id
        info = struct.pack("I", dbNode.id)
        # node name
        info += struct.pack("64s", dbNode.name.encode("utf-8"))
        info += struct.pack("I", len(dbNode.datanodes))
        primaryNum = 0
        for dnInst in dbNode.datanodes:
            self.__getDnState(dnInst, dbNode, localHostName, sshtool, logger)
            instanceType = 0
            if dnInst.localRole == "Primary":
                instanceType = MASTER_INSTANCE
                primaryNum += 1
            elif dnInst.localRole == "Cascade Standby":
                instanceType = CASCADE_STANDBY
            else:
                instanceType = STANDBY_INSTANCE
            if logger:
                logger.debug(f"""instanceInfo: name: {dnInst.hostname}, \
                         role: {dnInst.localRole}, \
                         state: {dnInst.state}""")
            info += struct.pack("I", dnInst.instanceId)
            # datanode id
            info += struct.pack("I", dnInst.mirrorId)
            # instanceType such as master, standby, dumpstandby
            info += struct.pack("I", instanceType)
            # datadir
            info += struct.pack("1024s", dnInst.datadir.encode("utf-8"))
        info += struct.pack("I", 0)
        info += struct.pack("I", 0)
        crc = binascii.crc32(info)
        return (primaryNum, struct.pack("I", crc) + info)

    def __getClusterSwitchTime(self, dynamicConfigFile):
        """
        function : get cluster version information
                   from static configuration file
        input : String
        output : version
        """
        fp = None
        try:
            fp = open(dynamicConfigFile, "rb")
            info = fp.read(24)
            (crc, lenth, version, switchTime, nodeNum) = \
                struct.unpack("=IIIqi", info)
            fp.close()
        except Exception as e:
            if fp:
                fp.close()
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"]
                            + " Error: \n%s." % str(e))
        return switchTime

    def __getDynamicConfig(self, user):
        gaussHome = self.__getEnvironmentParameterValue("GAUSSHOME", user)
        if gaussHome == "":
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                            ("installation path of designated user [%s]"
                             % user))
        # if under upgrade, and use chose strategy, we may get a wrong path,
        # so we will use the realpath of gausshome
        gaussHome = os.path.realpath(gaussHome)
        dynamicConfigFile = "%s/bin/cluster_dynamic_config" % gaussHome
        return dynamicConfigFile
    def __getDynamicSimpleDNConfig(self, user):
        gaussHome = self.__getEnvironmentParameterValue("GAUSSHOME", user)
        if gaussHome == "":
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                            ("installation path of designated user [%s]"
                             % user))
        # if under upgrade, and use chose strategy, we may get a wrong path,
        # so we will use the realpath of gausshome
        gaussHome = os.path.realpath(gaussHome)
        dynamicSimpleDNConfigFile = "%s/bin/cluster_dnrole_config" % gaussHome
        return dynamicSimpleDNConfigFile

    def dynamicConfigExists(self, user):
        dynamicConfigFile = self.__getDynamicConfig(user)
        return os.path.exists(dynamicConfigFile)

    def checkClusterDynamicConfig(self, user, localHostName):
        """
        function : make all the node dynamic config file is newest.
        input : String
        output : none
        """
        if self.__getDnInstanceNum() <= 1:
            return
        gaussHome = self.__getEnvironmentParameterValue("GAUSSHOME", user)
        if gaussHome == "":
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                            ("installation path of designated user [%s]"
                             % user))
        # if under upgrade, and use chose strategy, we may get a wrong path,
        # so we will use the realpath of gausshome
        gaussHome = os.path.realpath(gaussHome)
        dynamicConfigFile = "%s/bin/cluster_dynamic_config" % gaussHome
        lastSwitchTime = 0
        lastDynamicConfigFile = ""
        fileConsistent = False
        fileExist = False
        if os.path.exists(dynamicConfigFile):
            lastSwitchTime = self.__getClusterSwitchTime(dynamicConfigFile)
            lastDynamicConfigFile = dynamicConfigFile
            fileExist = True
            fileConsistent = True
        for dbNode in self.dbNodes:
            remoteDynamicConfigFile = "%s/bin/cluster_dynamic_config_%s" \
                                      % (gaussHome, dbNode.name)
            if dbNode.name != localHostName:
                node_ip = dbNode.sshIps[0]
                if get_ip_version(node_ip) == NET_IPV6:
                    # scp file is to the ipv6 address, needs to add [] to ipaddress:
                    # scp a.txt [2407:c080:1200:22a0:613f:8d3b:caa:2335]:/data
                    node_ip = "[" + node_ip + "]"
                cmd = "export LD_LIBRARY_PATH=/usr/lib64;/usr/bin/scp %s:%s %s" % (
                    node_ip, dynamicConfigFile, remoteDynamicConfigFile)
                status, output = subprocess.getstatusoutput(cmd)
                if status:
                    if output.find("No such file or directory") >= 0:
                        fileConsistent = False
                        continue
                    raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd
                                    + " Error:\n" + output)
                if os.path.exists(remoteDynamicConfigFile):
                    fileExist = True
                    switchTime = self.__getClusterSwitchTime(
                        remoteDynamicConfigFile)
                    if switchTime > lastSwitchTime:
                        lastSwitchTime = switchTime
                        lastDynamicConfigFile = remoteDynamicConfigFile
                        fileConsistent = False
                    elif switchTime < lastSwitchTime:
                        fileConsistent = False
        # if dynamic config file exist, but file time is not same,
        # send the valid file to all nodes
        if fileExist:
            if not fileConsistent:
                self.__sendDynamicCfgToAllNodes(localHostName,
                                                lastDynamicConfigFile,
                                                dynamicConfigFile)
            cleanCmd = "rm -f %s/bin/cluster_dynamic_config_*" % gaussHome
            subprocess.getstatusoutput(cleanCmd)

    def __sendDynamicCfgToAllNodes(self,
                                   localHostName,
                                   sourceFile,
                                   targetFile):
        status = 0
        output = ""
        for dbNode in self.dbNodes:
            if dbNode.name == localHostName:
                if sourceFile != targetFile:
                    cmd = "cp -f  %s %s" % (sourceFile, targetFile)
                    status, output = subprocess.getstatusoutput(cmd)
            else:
                node = self.getDbNodeByName(dbNode.name)
                node_ip = node.sshIps[0]
                if get_ip_version(node_ip) == NET_IPV6:
                    # scp file is to the ipv6 address, needs to add [] to ipaddress:
                    # scp a.txt [2407:c080:1200:22a0:613f:8d3b:caa:2335]:/data
                    node_ip = "[" + node_ip + "]"
                cmd = "export LD_LIBRARY_PATH=/usr/lib64;/usr/bin/scp %s %s:%s" % (sourceFile, node_ip, targetFile)
                status, output = subprocess.getstatusoutput(cmd)
            if status:
                raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd +
                                " Error:\n" + output)

    def readDynamicConfig(self, user):
        """
        function : read cluster information from dynamic configuration file
                   only used for start cluster after switchover
        input : String
        output : NA
        """
        fp = None
        try:
            self.name = self.__getEnvironmentParameterValue("GS_CLUSTER_NAME",
                                                            user)
            self.appPath = self.__getEnvironmentParameterValue("GAUSSHOME",
                                                               user)
            logPathWithUser = self.__getEnvironmentParameterValue("GAUSSLOG",
                                                                  user)
            splitMark = "/%s" % user
            # set log path without user
            # find the path from right to left
            self.logPath = \
                logPathWithUser[0:(logPathWithUser.rfind(splitMark))]
            dynamicConfigFile = self.__getDynamicConfig(user)
            # read dynamic_config_file
            dynamicConfigFilePath = os.path.split(dynamicConfigFile)[0]
            versionFile = os.path.join(
                dynamicConfigFilePath, "upgrade_version")
            version, number, commitid = VersionInfo.get_version_info(
                versionFile)
            fp = open(dynamicConfigFile, "rb")
            if float(number) <= 92.200:
                info = fp.read(28)
                (crc, lenth, version, currenttime, nodeNum) = \
                    struct.unpack("=qIIqi", info)
            else:
                info = fp.read(24)
                (crc, lenth, version, currenttime, nodeNum) = \
                    struct.unpack("=IIIqi", info)
            totalMaterDnNum = 0
            for i in range(nodeNum):
                offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
                fp.seek(offset)
                (dbNode, materDnNum) = self.__unpackDynamicNodeInfo(fp, number)
                totalMaterDnNum += materDnNum
                self.dbNodes.append(dbNode)
            if totalMaterDnNum != 1:
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] %
                                ("master dn", "1"))
            fp.close()
        except Exception as e:
            if fp:
                fp.close()
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] %
                            dynamicConfigFile + " Error:\n" + str(e))

    def __unpackDynamicNodeInfo(self, fp, number):
        if float(number) <= 92.200:
            info = fp.read(76)
            (crc, nodeId, nodeName) = struct.unpack("=qI64s", info)
        else:
            info = fp.read(72)
            (crc, nodeId, nodeName) = struct.unpack("=II64s", info)
        nodeName = nodeName.decode().strip('\x00')
        dbNode = dbNodeInfo(nodeId, nodeName)
        info = fp.read(4)
        (dataNodeNums,) = struct.unpack("=I", info)
        dbNode.datanodes = []
        materDnNum = 0
        for i in range(dataNodeNums):
            dnInst = instanceInfo()
            dnInst.hostname = nodeName
            info = fp.read(12)
            (dnInst.instanceId, dnInst.mirrorId, dnInst.instanceType) = \
                struct.unpack("=III", info)
            if dnInst.instanceType == MASTER_INSTANCE:
                materDnNum += 1
            elif dnInst.instanceType not in [STANDBY_INSTANCE,
                                             DUMMY_STANDBY_INSTANCE, CASCADE_STANDBY]:
                raise Exception(ErrorCode.GAUSS_512["GAUSS_51204"] %
                                ("DN", dnInst.instanceType))
            info = fp.read(1024)
            (datadir,) = struct.unpack("=1024s", info)
            dnInst.datadir = datadir.decode().strip('\x00')
            dbNode.datanodes.append(dnInst)
        return (dbNode, materDnNum)

    def hasNoCm(self):
        """
        function:check whether cm exist
        :return:True or False
        """
        return self.cmscount < 1

    def getDbNodeByID(self, inputid):
        """
        function : Get node by id.
        input : nodename
        output : []
        """
        for dbNode in self.dbNodes:
            if dbNode.id == inputid:
                return dbNode
        return None

    def __read_cluster_float_ips(self, dn_float_ips):
        """
        Read cluster global info(float IP) to dbClusterInfo
        """
        for ips_tmp in dn_float_ips:
            for res_name in ips_tmp:
                if res_name not in self.float_ips:
                    ret_status, ret_value = self.readOneClusterConfigItem(
                                            xmlRootNode, res_name, "CLUSTER")
                    if ret_status == 0:
                        self.float_ips[res_name] = ret_value.strip()
                    else:
                        raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
                                        "float IP." + " Error: \n%s" % ret_value)

    def printStaticConfig(self, xmlFile, fileName=""):
        """
        function : printStaticConfig
        input : String
        output : NA
        """
        clusterInfo = dbClusterInfo()
        clusterInfo.initFromXml(xmlFile)
        try:
            # read static_config_file
            outText = "NodeHeader:\n"
            outText = outText + ("version:%u\n" % clusterInfo.version)
            outText = outText + ("time:%ld\n" % clusterInfo.installTime)
            outText = outText + ("nodeCount:%u\n" % clusterInfo.nodeCount)
            outText = outText + ("node:%u\n" % clusterInfo.localNodeId)
            outText = outText + ("=" * 60 + "\n")
            dnTotalNum = self.__getDnInstanceNum()
            for dbNode in self.dbNodes:
                outText = outText + ("azName:%s\n" % dbNode.azName)
                outText = outText + ("azPriority:%u\n" % dbNode.azPriority)
                outText = outText + ("node :%u\n" % dbNode.id)
                outText = outText + ("nodeName:%s\n" % dbNode.name)

                outText = outText + "ssh channel :\n"
                j = 0
                for sshIp in dbNode.sshIps:
                    outText = outText + ("sshChannel %u:%s\n" % (
                        j + 1, dbNode.sshIps[j]))
                    j = j + 1
                outText = outText + (
                        "datanodeCount :%u\n" % len(dbNode.datanodes))
                j = 0
                for dnInst in dbNode.datanodes:
                    j = j + 1
                    outText = outText + ("datanodeInstanceType :%s\n" %
                                         DICT_INSTANCE[dnInst.instanceType])
                    outText = outText + ("datanode %u:\n" % j)
                    outText = outText + (
                            "datanodeLocalDataPath :%s\n" % dnInst.datadir)
                    outText = outText + (
                            "datanodeXlogPath :%s\n" % dnInst.xlogdir)
                    k = 0
                    for listenIp in dnInst.listenIps:
                        k = k + 1
                        outText = outText + (
                                "datanodeListenIP %u:%s\n" % (k, listenIp))
                    outText = outText + ("datanodePort :%u\n" % dnInst.port)
                    k = 0
                    for haIp in dnInst.haIps:
                        k = k + 1
                        outText = outText + (
                                "datanodeLocalHAIP %u:%s\n" % (k, haIp))
                    outText = outText + (
                            "datanodeLocalHAPort :%u\n" % dnInst.haPort)
                    outText = outText + (
                            "dn_replication_num: %u\n" % dnTotalNum)
                    maxPeerNum = MIRROR_COUNT_REPLICATION_MAX if \
                        self.nodeCount > MIRROR_COUNT_REPLICATION_MAX \
                        else self.nodeCount
                    for k in range(maxPeerNum - 1):
                        outText = outText + ("datanodePeer%uDataPath :%s\n" %
                                             (k, dnInst.peerInstanceInfos[k].peerDataPath))
                        m = 0
                        for peerHaIP in dnInst.peerInstanceInfos[k].peerHAIPs:
                            m += 1
                            outText = outText + ("datanodePeer%uHAIP %u:%s\n" % (k, m, peerHaIP))
                        outText = outText + ("datanodePeer%uHAPort :%u\n" %
                                             (k, dnInst.peerInstanceInfos[k].peerHAPort))

                    outText = outText + ("=" * 60 + "\n")
            self.__fprintContent(outText, fileName)
        except Exception as e:
            raise Exception(ErrorCode.GAUSS_516["GAUSS_51652"] % str(e))
    
    def doRebuildConf(self, xmlFile):
        """
        generating static configuration files for all nodes
        input:NA
        output:NA
        """
        try:
            tmpDirName = ""

            clusterInfo = dbClusterInfo()
            clusterInfo.initFromXml(xmlFile)

            dirName = os.path.dirname(os.path.realpath(__file__))
            tmpDirName = os.path.realpath("%s/static_config_files" % dirName)
            cmd = "mkdir -p -m %s '%s'" % (
                KEY_DIRECTORY_MODE, tmpDirName)
            (status, output) = subprocess.getstatusoutput(cmd)
            if (status != 0):
                raise Exception(
                    ErrorCode.GAUSS_502["GAUSS_50208"]
                    % "temporary directory" + "\nCommand:%s\nError: %s"
                    % (cmd, output))

            for dbNode in self.dbNodes:
                staticConfigPath = "%s/cluster_static_config_%s" % (
                    tmpDirName, dbNode.name)
                clusterInfo.saveToStaticConfig(staticConfigPath,
                                                            dbNode.id)

            for dbNode in clusterInfo.dbNodes:
                if (dbNode.name != GetHostIpOrName()):
                    cmd = 'ssh -q -o ConnectTimeout=5 %s mkdir -p %s/bin' % (dbNode.sshIps[0], clusterInfo.appPath)
                    (status, output) = subprocess.getstatusoutput(cmd)
                    if status != 0:
                        print("ERROR")
                    cmd = \
                        "scp %s/cluster_static_config_%s %s:%s/bin/cluster_static_config" % (
                            tmpDirName,
                            dbNode.name, dbNode.sshIps[0], clusterInfo.appPath)
                else:
                    cmd = \
                        "mkdir -p %s/bin; cp %s/cluster_static_config_%s %s" \
                        "/bin/cluster_static_config" % (
                            clusterInfo.appPath, tmpDirName,
                            dbNode.name, clusterInfo.appPath)
                (status, output) = subprocess.getstatusoutput(cmd)
                if status != 0:
                    raise Exception(
                        ErrorCode.GAUSS_502["GAUSS_50216"]
                        % "static configuration file"
                        + "Node: %s.\nCommand: \n%s\nError: \n%s"
                        % (dbNode.name, cmd, output))

        except Exception as e:
            removeDirectory(tmpDirName)
            raise Exception(str(e))

    def execute_on_node(self, node, cmd, local_name=None, ssh_ip=None):
        """
        node: 节点名
        cmd: 要执行的命令（不带ssh前缀）
        local_name: 本地主机名或IP
        ssh_ip: 远程ssh IP（如有）
        返回: (status, output)
        """
        if node == local_name or node == GetHostIpOrName():
            return subprocess.getstatusoutput(cmd)
        else:
            ssh_target = ssh_ip if ssh_ip else node
            ssh_cmd = f"ssh -q -o ConnectTimeout=5 {ssh_target} '{cmd}'"
            return subprocess.getstatusoutput(ssh_cmd)

    def gr_install(self, xmlFile, PkgPath):
        user = getpass.getuser()
        gaussHome = self.__getEnvironmentParameterValue("GAUSSHOME", user)
        if (gaussHome == ""):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                            ("installation path of designated user [%s]" % user))
        clusterInfo = dbClusterInfo()
        clusterInfo.initFromXml(xmlFile)
        # 初始化日志模块
        self.initLogger("install")

        self.logger.log("Start insatll oGRecorder.")
        pkgName = os.path.basename(PkgPath)
        cfg_path = os.path.join(clusterInfo.grPath, "cfg", "gr_inst.ini")
        parent_dir = os.path.dirname(cfg_path)
        # 创建目录
        for dbNode in clusterInfo.dbNodes:
            cmd = "mkdir -p %s/script %s/data %s" % (clusterInfo.toolPath, clusterInfo.grPath, parent_dir)
            status, output = self.execute_on_node(
                dbNode.name, cmd, local_name=GetHostIpOrName(), ssh_ip=dbNode.sshIps[0])
            if status != 0:
                raise Exception(f"Failed to create install path, Error output:\n{output}")
        self.logger.log("Successfully create install path.")

        # 分发
        for dbNode in clusterInfo.dbNodes:
            if dbNode.name != GetHostIpOrName():
                cmd = "scp %s %s:%s" % (PkgPath, dbNode.sshIps[0], clusterInfo.toolPath)
                status, output = subprocess.getstatusoutput(cmd)
            else:
                cmd = 'cp %s %s' % (PkgPath, clusterInfo.toolPath)
                status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(f"failed to distribute install gr pkg, Error output:\n{output}")
        self.logger.log("Successfully distribute install gr pkg.")

        # 解压
        for dbNode in clusterInfo.dbNodes:
            cmd = 'tar -zxf %s/%s -C %s' % (clusterInfo.toolPath, pkgName, clusterInfo.toolPath)
            status, output = self.execute_on_node(
                dbNode.name, cmd, local_name=GetHostIpOrName(), ssh_ip=dbNode.sshIps[0])
            if status != 0:
                raise Exception(f"failed to decompress gr pkg, Error output:\n{output}")
        self.logger.log("Successfully decompress gr pkg.")

        # 修改配置文件
        for i, dbNode in enumerate(clusterInfo.dbNodes):
            config_content = (
                f"INST_ID={i}\n"
                f"LOG_LEVEL=255\n"
                f"gr_nodes_list={clusterInfo.gr_nodes_list}\n"
                f"LISTEN_ADDR={dbNode.listen_addr}\n"
                f"IP_WHITE_LIST={dbNode.sshIps[0]},127.0.0.1\n"
                f"DATA_FILE_PATH={clusterInfo.wormPath}\n"
                f"GR_CM_SO_NAME=libclient.so\n"
            )
            cmd = f"printf \"%s\" \"{config_content}\" > {cfg_path}"
            status, output = self.execute_on_node(
                dbNode.name, cmd, local_name=GetHostIpOrName(), ssh_ip=dbNode.sshIps[0])
            if status != 0:
                self.logger.error(f"Failed to create/write gr config file on {dbNode.name}, output:\n{output}")
                raise Exception(f"failed to create/write gr config file, Error output:\n{output}")
        self.logger.log("Successfully create and write gr config file.")

        # 放到grPath下面
        for dbNode in clusterInfo.dbNodes:
            cmd = 'cp -r %s/bin %s/lib %s' % (clusterInfo.toolPath, clusterInfo.toolPath, clusterInfo.grPath)
            status, output = self.execute_on_node(
                dbNode.name, cmd, local_name=GetHostIpOrName(), ssh_ip=dbNode.sshIps[0])
            if status != 0:
                raise Exception(f"failed to add gr bin and lib, Error output:\n{output}")
        self.logger.log("Successfully add gr bin and lib.")

        # 生成标志文件(CM)
        for dbNode in clusterInfo.dbNodes:
            cmd = 'touch %s/bin/cluster_manual_walrecord' % (clusterInfo.appPath)
            status, output = self.execute_on_node(
                dbNode.name, cmd, local_name=GetHostIpOrName(), ssh_ip=dbNode.sshIps[0])
            if status != 0:
                raise Exception(f"failed to create cluster_manual_walrecord file, Error output:\n{output}")
        self.logger.log("Successfully create cluster_manual_walrecord file.")

        # 生成资源添加脚本
        add_cmd = (
            'cm_ctl res --add --res_name="gr" '
            '--res_attr="resources_type=APP,'
            'script={app_path}/bin/gr_contrl.sh,'
            'check_interval=1,'
            'timeout=120,'
            'restart_times=5,'
            'restart_delay=1,'
            'restart_period=1"'
        ).format(app_path=clusterInfo.grPath)

        edit_cmds = []
        for i, dbNode in enumerate(clusterInfo.dbNodes):
            edit_cmd = (
                'cm_ctl res --edit --res_name="gr" '
                '--add_inst="node_id={node_id},'
                'res_instance_id={instance_id},'
                'res_args={gr_path}"'
            ).format(
                node_id=dbNode.id,
                instance_id=i,
                gr_path=clusterInfo.grPath
            )
            edit_cmds.append(edit_cmd)

        all_cmds = [add_cmd] + edit_cmds
        for dbNode in clusterInfo.dbNodes:
            # 清理旧的 gr_res.sh
            cmd = 'rm -rf %s/gr_res.sh %s/*' % (clusterInfo.toolPath, dbNode.cmDataDir)
            status, output = self.execute_on_node(
                dbNode.name, cmd, local_name=GetHostIpOrName(), ssh_ip=dbNode.sshIps[0])
            if status != 0:
                raise Exception(f"failed to clean gr_res.sh file, Error output:\n{output}")
            # 追加新内容
            for res_cmd in all_cmds:
                cmd = f"echo '{res_cmd}' >> {clusterInfo.toolPath}/gr_res.sh"
                status, output = self.execute_on_node(
                    dbNode.name, cmd, local_name=GetHostIpOrName(), ssh_ip=dbNode.sshIps[0])
                if status != 0:
                    raise Exception(f"failed to create gr_res.sh file, Error output:\n{output}")
        self.logger.log("Successfully create gr_res.sh file.")


        # 只在主节点生成配置文件
        main_node = clusterInfo.dbNodes[0]  # 使用第一个节点作为主节点
        main_config_content = (
            f"INST_ID=0\n"
            f"LOG_LEVEL=255\n"
            f"gr_nodes_list={clusterInfo.gr_nodes_list}\n"
            f"LISTEN_ADDR={main_node.listen_addr}\n"
            f"IP_WHITE_LIST={main_node.sshIps[0]},127.0.0.1\n"
            f"DATA_FILE_PATH={clusterInfo.wormPath}\n"
            f"GR_CM_SO_NAME=libclient.so\n"
        )
        
        # 在主节点生成配置文件
        main_config_path = f"{parent_dir}/gr_inst.ini"
        cli_config_path = f"{parent_dir}/gr_cli_inst.ini"
        with open(main_config_path, 'w') as f:
            f.write(main_config_content)

        self.logger.log("Successfully create initial config file on main node.")

        # 生成证书（grcmd会在配置文件中添加证书相关配置）
        envfile = self.__getEnvironmentParameterValue("MPPDB_ENV_SEPARATE_PATH", user)
        ca_path = clusterInfo.grPath
        if not ca_path.endswith("CA"):
            ca_path = os.path.join(ca_path, "CA")
        for dbNode in clusterInfo.dbNodes:
            if (dbNode.name != GetHostIpOrName()):
                cmd = "scp -r {certPath}/ {host}:{certPath}".format(certPath=ca_path, host=dbNode.sshIps[0])
            else:
                cmd = 'source %s && grcmd gencert -t server -d 1000 && grcmd gencert -t client -d 1000' % (envfile)
            (status, output) = subprocess.getstatusoutput(cmd)
            if (status != 0):
                raise Exception(f"failed to create gr cert, Error output:\n{output}, cmd is {cmd}")

        # 为每个节点生成正确的配置文件并分发
        for i, dbNode in enumerate(clusterInfo.dbNodes):
            # 读取主节点的配置文件（包含证书配置）
            with open(main_config_path, 'r') as f:
                config_content = f.read()
            
            # 修改INST_ID和LISTEN_ADDR
            config_content = config_content.replace(f"INST_ID=0", f"INST_ID={i}")
            config_content = config_content.replace(f"LISTEN_ADDR={main_node.listen_addr}", f"LISTEN_ADDR={dbNode.listen_addr}")
            config_content = config_content.replace(f"IP_WHITE_LIST={main_node.sshIps[0]},127.0.0.1", f"IP_WHITE_LIST={dbNode.sshIps[0]},127.0.0.1")
            
            # 在本地生成节点特定的gr_inst.ini配置文件
            local_config_path = f"{parent_dir}/gr_inst_{dbNode.name}.ini"
            with open(local_config_path, 'w') as f:
                f.write(config_content)
            
            # 分发配置文件到对应节点
            cmd = f"scp {local_config_path} {dbNode.sshIps[0]}:{parent_dir}/gr_inst.ini"
            status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(f"failed to distribute gr_inst.ini to {dbNode.name}, Error output:\n{output}")
            
            # 分发gr_cli_inst.ini（直接使用主节点的配置）
            cmd = f"scp {cli_config_path} {dbNode.sshIps[0]}:{parent_dir}/gr_cli_inst.ini"
            status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(f"failed to distribute gr_cli_inst.ini to {dbNode.name}, Error output:\n{output}")
            
            # 清理本地临时文件
            if os.path.exists(local_config_path):
                os.remove(local_config_path)

        self.logger.log("Successfully create gr cert.")

        # gr_om
        for dbNode in clusterInfo.dbNodes:
            if dbNode.name != GetHostIpOrName():
                cmd = "scp gr_om %s:%s/bin" % (dbNode.sshIps[0], clusterInfo.appPath)
                status, output = subprocess.getstatusoutput(cmd)
            else:
                cmd = 'cp gr_om %s/bin' % (clusterInfo.appPath)
                status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(f"failed tp distribute gr_om, Error output:\n{output}")
        self.logger.log("Successfully distribute gr_om.")

        # pssh
        for dbNode in clusterInfo.dbNodes:
            if dbNode.name != GetHostIpOrName():
                cmd = "scp -r pssh/bin/* %s:%s/bin" % (dbNode.sshIps[0], clusterInfo.appPath)
                status, output = subprocess.getstatusoutput(cmd)
                if status == 0:
                    # Set execute permissions for pssh binaries
                    cmd = "ssh -q -o ConnectTimeout=5 %s 'chmod +x %s/bin/*'" % (dbNode.sshIps[0], clusterInfo.appPath)
                    status, output = subprocess.getstatusoutput(cmd)
            else:
                cmd = 'cp -r pssh/bin/* %s/bin' % (clusterInfo.appPath)
                status, output = subprocess.getstatusoutput(cmd)
                if status == 0:
                    # Set execute permissions for pssh binaries
                    cmd = 'chmod +x %s/bin/*' % (clusterInfo.appPath)
                    status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(f"failed to distribute pssh, Error output:\n{output}")
        self.logger.log("Successfully distribute pssh.")

        # gr_contrl.sh脚本
        for dbNode in clusterInfo.dbNodes:
            if dbNode.name != GetHostIpOrName():
                cmd = "scp gr_contrl.sh %s:%s/bin" % (dbNode.sshIps[0], clusterInfo.appPath)
                status, output = subprocess.getstatusoutput(cmd)
                if status == 0:
                    # Set execute permissions for gr_contrl.sh
                    cmd = "ssh -q -o ConnectTimeout=5 %s 'chmod +x %s/bin/gr_contrl.sh'" % (dbNode.sshIps[0], clusterInfo.appPath)
                    status, output = subprocess.getstatusoutput(cmd)
            else:
                cmd = 'cp gr_contrl.sh %s/bin' % (clusterInfo.appPath)
                status, output = subprocess.getstatusoutput(cmd)
                if status == 0:
                    # Set execute permissions for gr_contrl.sh
                    cmd = 'chmod +x %s/bin/gr_contrl.sh' % (clusterInfo.appPath)
                    status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(f"failed to distribute gr_contrl.sh, Error output:\n{output}")
        self.logger.log("Successfully distribute gr_contrl.sh.")

        # py_pstree.py脚本
        for dbNode in clusterInfo.dbNodes:
            if dbNode.name != GetHostIpOrName():
                cmd = "scp py_pstree.py %s:%s/script/" % (dbNode.sshIps[0], clusterInfo.toolPath)
                status, output = subprocess.getstatusoutput(cmd)
                if status == 0:
                    # Set execute permissions for py_pstree.py
                    cmd = "ssh -q -o ConnectTimeout=5 %s 'chmod +x %s/script/py_pstree.py'" % (dbNode.sshIps[0], clusterInfo.toolPath)
                    status, output = subprocess.getstatusoutput(cmd)
            else:
                cmd = 'cp py_pstree.py %s/script' % (clusterInfo.toolPath)
                status, output = subprocess.getstatusoutput(cmd)
                if status == 0:
                    # Set execute permissions for py_pstree.py
                    cmd = 'chmod +x %s/script/py_pstree.py' % (clusterInfo.toolPath)
                    status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                raise Exception(f"failed to distribute py_pstree.py, Error output:\n{output}")
        self.logger.log("Successfully distribute py_pstree.py.")
        self.logger.log("Successfully install oGRecorder.")

    def preinstall(self, config_file, user, envfile):
        """
        Preinstall oGRecorder cluster using JSON configuration file
        """
        # Read JSON configuration file
        if not os.path.exists(config_file):
            print(f"Configuration file {config_file} does not exist!")
            sys.exit(1)
        
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
        except Exception as e:
            print(f"Failed to read JSON configuration file: {e}")
            sys.exit(1)
        
        # Extract cluster information from JSON config
        if 'cluster' not in config:
            print("Missing cluster configuration in JSON file!")
            sys.exit(1)
        
        cluster_config = config['cluster']
        nodes = cluster_config.get('nodes', [])
        
        if not nodes:
            print("No nodes found in cluster configuration!")
            sys.exit(1)
        
        # Initialize logger
        clusterInfo = dbClusterInfo()
        clusterInfo.initLogger("preinstall")
        logger = clusterInfo.logger
        
        # Set paths based on JSON configuration
        install_path = cluster_config.get('install_path', '/home/czk/install/')
        worm_path = cluster_config.get('worm_path', '/home/czk/data/')
        clusterInfo.installPath = install_path
        clusterInfo.toolPath = os.path.join(install_path, "tool")
        clusterInfo.grPath = os.path.join(install_path, "gr")
        clusterInfo.appPath = os.path.join(install_path, "gr")
        clusterInfo.logPath = os.path.join(install_path, "log")
        clusterInfo.tmpPath = os.path.join(install_path, "tmp")
        clusterInfo.wormPath = worm_path  # 保存worm_path到clusterInfo中
        datadnPath = os.path.join(install_path, "data/dn") # cm need "dn" to check disk, maybe need to change
        logger.log("Start preinstall oGRecorder.")
        
        # Create directories
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            # Create basic directories
            cmd = f"mkdir -p {clusterInfo.toolPath} {clusterInfo.grPath} {clusterInfo.appPath} {clusterInfo.logPath} {clusterInfo.tmpPath} {datadnPath} -m 755"
            status, output = self.execute_on_node(
                node_name, cmd, local_name=GetHostIpOrName(), ssh_ip=node_name)
            if status != 0:
                raise Exception(f"failed to create preinstall directory on {node_name}, Error output:\n{output}")
        logger.log("Successfully create preinstall directory.")

        # Change permissions
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            # 先设置wormPath权限（root用户执行，可以修改权限）
            cmd = f'chown {user}:{user} {clusterInfo.wormPath} && chmod 777 {clusterInfo.wormPath}'
            status, output = self.execute_on_node(
                node_name, cmd, local_name=GetHostIpOrName(), ssh_ip=node_name)
            if status != 0:
                logger.warn(f"Failed to set wormPath permissions on {node_name}: {output}")
                logger.warn(f"wormPath may be read-only or NFS mounted, continuing...")
            else:
                logger.log(f"Successfully set wormPath permissions on {node_name}")
            
            # 创建磁盘文件并设置权限（root用户执行）
            sharedisk_path = os.path.join(clusterInfo.wormPath, "sharedisk")
            votingdisk_path = os.path.join(clusterInfo.wormPath, "votingdisk")
            cmd = f"truncate -s 1G {sharedisk_path} && truncate -s 1G {votingdisk_path} && chmod 777 {sharedisk_path} && chmod 777 {votingdisk_path}"
            status, output = self.execute_on_node(
                node_name, cmd, local_name=GetHostIpOrName(), ssh_ip=node_name)
            if status != 0:
                logger.warn(f"Failed to create and set permissions for disk files on {node_name}: {output}")
                logger.warn(f"Continuing installation...")
            else:
                logger.log(f"Successfully created and set permissions for disk files on {node_name}")
            
            # 设置其他目录权限
            cmd = f'chown -R {user}:{user} {clusterInfo.toolPath} {clusterInfo.grPath} {clusterInfo.appPath} {clusterInfo.logPath} {clusterInfo.tmpPath} {datadnPath}'
            status, output = self.execute_on_node(
                node_name, cmd, local_name=GetHostIpOrName(), ssh_ip=node_name)
            if status != 0:
                raise Exception(f"failed to change permissions on {node_name}, Error output:\n{output}")
        logger.log("Successfully change permissions.")

        # Create environment variable file
        parent_dir = os.path.dirname(envfile)
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            if (node_name != GetHostIpOrName()):
                cmd = f"ssh -q -o ConnectTimeout=5 {node_name} 'mkdir -p {parent_dir} && touch {envfile}'"
            else:
                cmd = f'mkdir -p {parent_dir} && touch {envfile}'
            (status, output) = subprocess.getstatusoutput(cmd)
            if (status != 0):
                raise Exception(f"failed to create environment variable file on {node_name}, Error output:\n{output}")
        
        # Configure environment variables
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            if (node_name != GetHostIpOrName()):
                cmd = f'''ssh -q -o ConnectTimeout=5 {node_name} "cat > {envfile} <<'EOF'
export MPPDB_ENV_SEPARATE_PATH={envfile}
export GPHOME={clusterInfo.toolPath}
export GAUSSHOME={clusterInfo.appPath}
export GAUSSLOG={clusterInfo.logPath}
export GR_HOME={clusterInfo.grPath}
export PGHOST={clusterInfo.tmpPath}
export PATH=\$GAUSSHOME/bin/:\$GR_HOME/bin/:\$PATH
export LD_LIBRARY_PATH=\$GAUSSHOME/lib:\$GR_HOME/lib:\$LD_LIBRARY_PATH
EOF"
'''
            else:
                cmd = f'''cat > {envfile} <<'EOF'
export MPPDB_ENV_SEPARATE_PATH={envfile}
export GPHOME={clusterInfo.toolPath}
export GAUSSHOME={clusterInfo.appPath}
export GAUSSLOG={clusterInfo.logPath}
export GR_HOME={clusterInfo.grPath}
export PGHOST={clusterInfo.tmpPath}
export PATH=$GAUSSHOME/bin/:$GR_HOME/bin/:$PATH
export LD_LIBRARY_PATH=$GAUSSHOME/lib:$GR_HOME/lib:$LD_LIBRARY_PATH
EOF
'''
            (status, output) = subprocess.getstatusoutput(cmd)
            if (status != 0):
                raise Exception(f"failed to add environment variable on {node_name}, Error output:\n{output}")
        logger.log("Successfully add environment variable.")

        # Change permissions for environment file
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            cmd = f'chown -R {user}:{user} {parent_dir}'
            status, output = self.execute_on_node(
                node_name, cmd, local_name=GetHostIpOrName(), ssh_ip=node_name)
            if status != 0:
                raise Exception(f"failed to change permissions on {node_name}, Error output:\n{output}")
        logger.log("Successfully change permissions.")

        # Add crontab permission
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            cmd = f"grep -q '^{user}$' /etc/cron.allow || echo '{user}' >> /etc/cron.allow"
            status, output = self.execute_on_node(
                node_name, cmd, local_name=GetHostIpOrName(), ssh_ip=node_name)
            if status != 0:
                raise Exception(f"Failed to add user {user} to /etc/cron.allow on node {node_name}")
        logger.log("Successfully add crontab permission.")

        # Set /etc/security/limits.conf resource limits
        limits_conf_lines = [
            f"{user}       soft    as  unlimited",
            f"{user}       hard    as  unlimited",
            f"{user}       soft    nproc  unlimited",
            f"{user}       hard    nproc  unlimited",
            f"{user}       soft    nofile 1000000",
            f"{user}       hard    nofile 1000000",
        ]
        limits_conf_cmd = ""
        for line in limits_conf_lines:
            limits_conf_cmd += f"grep -q \"^{line}\" /etc/security/limits.conf || echo \"{line}\" >> /etc/security/limits.conf; "
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            status, output = self.execute_on_node(
                node_name, limits_conf_cmd, local_name=GetHostIpOrName(), ssh_ip=node_name)
            if status != 0:
                raise Exception(f"Failed to set limits.conf for user {user} on node {node_name}, Error output:\n{output}")
        logger.log("Successfully set limits.conf.")
        logger.log("Successfully preinstall oGRecorder.")

    def uninstall(self, config_file):
        """
        Uninstall oGRecorder from JSON configuration file
        """

        RED = '\033[0;31m'
        GREEN = '\033[0;32m'
        YELLOW = '\033[0;33m'
        NC = '\033[0m'

        # Read JSON configuration file
        if not os.path.exists(config_file):
            print(f"{RED}Configuration file {config_file} does not exist!{NC}")
            sys.exit(1)
        
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
        except Exception as e:
            print(f"{RED}Failed to read JSON configuration file: {e}{NC}")
            sys.exit(1)
        
        # Validate configuration file format
        if 'cluster' not in config or 'nodes' not in config['cluster']:
            print(f"{RED}Missing cluster.nodes configuration in JSON file!{NC}")
            sys.exit(1)
        
        # Get cluster information
        cluster_name = config['cluster'].get('name', 'cluster')
        install_path = config['cluster'].get('install_path', '/home/czk/install/')
        nodes = config['cluster']['nodes']
        
        if not nodes:
            print(f"{RED}Node list is empty!{NC}")
            sys.exit(1)
        
        print(f"{GREEN}Starting to uninstall oGRecorder cluster: {cluster_name}{NC}")
        print(f"{GREEN}Installation path: {install_path}{NC}")
        print(f"{GREEN}Number of nodes: {len(nodes)}{NC}")
        
        # Initialize logging module
        self.initLogger("uninstall")
        self.logger.log("Start uninstall oGrecorder.")
        
        # Get current user
        user = getpass.getuser()
        envfile = self.__getEnvironmentParameterValue("MPPDB_ENV_SEPARATE_PATH", user)
        if (envfile == ""):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
                            ("installation path of designated user [%s]" % user))
        
        # Clean directories on each node
        for node in nodes:
            node_name = node['name']
            node_ip = node['ip']
            
            print(f"{GREEN}[{node_name}] Starting to clean node...{NC}")
            
            # Build cleanup command
            cmd = f'rm -rf {install_path}/* {install_path}/log/* {envfile}'
            
            # Execute cleanup command
            if node_ip == socket.gethostbyname(socket.gethostname()):
                # Local node
                status, output = subprocess.getstatusoutput(cmd)
            else:
                # Remote node
                ssh_cmd = f"ssh -q -o ConnectTimeout=5 {user}@{node_ip} '{cmd}'"
                status, output = subprocess.getstatusoutput(ssh_cmd)
            
            if status != 0:
                error_msg = f"failed to clean oGRecorder directory on {node_name}. Error output:\n{output}"
                print(f"{RED}{error_msg}{NC}")
                raise Exception(error_msg)
            
            print(f"{GREEN}[{node_name}] Node cleanup completed{NC}")
        
        self.logger.log("Successfully uninstall oGRecorder.")
        print(f"{GREEN}oGRecorder uninstallation completed!{NC}")

    def distribute_gr_certs(self, config_file, envFile):
        """
        Generate GR certificates and distribute to all nodes, then reload certificates
        """
        # Read JSON configuration file
        if not os.path.exists(config_file):
            print(f"Configuration file {config_file} does not exist!")
            sys.exit(1)
        
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
        except Exception as e:
            print(f"Failed to read JSON configuration file: {e}")
            sys.exit(1)
        
        # Extract cluster information from JSON config
        if 'cluster' not in config:
            print("Missing cluster configuration in JSON file!")
            sys.exit(1)
        
        cluster_config = config['cluster']
        nodes = cluster_config.get('nodes', [])
        
        if not nodes:
            print("No nodes found in cluster configuration!")
            sys.exit(1)
        
        # Initialize logger
        clusterInfo = dbClusterInfo()
        clusterInfo.initLogger("gr_certs")
        logger = clusterInfo.logger
        
        # Get current user
        user = getpass.getuser()
        
        # Set paths based on JSON configuration
        install_path = cluster_config.get('install_path', '/home/czk/install/')
        worm_path = cluster_config.get('worm_path', '/home/czk/data/')
        clusterInfo.installPath = install_path
        clusterInfo.grPath = os.path.join(install_path, "gr")
        clusterInfo.wormPath = worm_path  # 保存worm_path到clusterInfo中
        
        # Certificate directory, avoid duplicate CA concatenation
        ca_path = clusterInfo.grPath
        if not ca_path.endswith("CA"):
            ca_path = os.path.join(ca_path, "CA")
        
        # Create certificate directory if it doesn't exist
        if not os.path.exists(ca_path):
            logger.log(f"Creating certificate directory: {ca_path}")
            os.makedirs(ca_path, exist_ok=True)
        
        # Generate certificates
        logger.log("Start generate gr certs.")
        logger.log(f"Certificate path: {ca_path}")
        gen_cert_cmd = f"source {envFile} && grcmd gencert -t server -d 1000 && grcmd gencert -t client -d 1000"
        status, output = subprocess.getstatusoutput(gen_cert_cmd)
        if status != 0:
            logger.logExit(f"failed to create gr cert, Error output:\n{output}")
        logger.log("Successfully generate gr certs.")
        
        # Verify certificates were created
        if not os.path.exists(ca_path) or not os.listdir(ca_path):
            logger.logExit(f"Certificate directory {ca_path} is empty or does not exist after generation")
        logger.log(f"Certificates generated successfully in: {ca_path}")

        # Fix local permissions to ensure scp works
        chmod_cmd = f"chmod -R 755 {ca_path}"
        subprocess.getstatusoutput(chmod_cmd)

        # Fix remote permissions in advance to prevent scp failure
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            if node_name != GetHostIpOrName():
                # Fix remote permissions first
                fix_perm_cmd = f"ssh -q -o ConnectTimeout=5 {node_name} 'mkdir -p {ca_path}; chmod -R 755 {ca_path}'"
                subprocess.getstatusoutput(fix_perm_cmd)

        # Distribute certificates
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            if node_name != GetHostIpOrName():
                logger.log(f"Distributing certificates to node: {node_name}")
                cmd = f"scp -r {ca_path}/* {node_name}:{ca_path}/"
                status, output = subprocess.getstatusoutput(cmd)
                if status != 0:
                    logger.logExit(f"failed to distribute gr cert to {node_name}, Error output:\n{output}")
                logger.log(f"Successfully distributed certificates to {node_name}")
            else:
                logger.log(f"Skipping local node: {node_name}")
        logger.log("Successfully distribute gr certs.")

        # Restore permissions for top-level files under CA path (skip demoCA directory)
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            # only chmod files directly under CA dir, do not touch demoCA subtree
            perm_cmd = f"find {ca_path} -maxdepth 1 -type f -exec chmod 400 {{}} +"
            if node_name != GetHostIpOrName():
                cmd = f"ssh -q -o ConnectTimeout=5 {node_name} '{perm_cmd}'"
            else:
                cmd = perm_cmd
            subprocess.getstatusoutput(cmd)

        # Reload certificates
        for node in nodes:
            node_name = node.get('ip', node.get('name', ''))
            reload_cmd = f"source {envFile} && grcmd reload_certs"
            if node_name != GetHostIpOrName():
                cmd = f"ssh -q -o ConnectTimeout=5 {node_name} '{reload_cmd}'"
            else:
                cmd = reload_cmd
            status, output = subprocess.getstatusoutput(cmd)
            if status != 0:
                logger.logExit(f"failed to reload gr cert on {node_name}, Error output:\n{output}")
        logger.log("Successfully reload gr certs on all nodes.")

    def setup_ssh_trust(self, config_file):
        import getpass
        import tempfile

        RED = '\033[0;31m'
        GREEN = '\033[0;32m'
        YELLOW = '\033[0;33m'
        NC = '\033[0m'

        # Read host list from JSON config
        if not os.path.exists(config_file):
            print(f"{RED}Configuration file {config_file} does not exist!{NC}")
            sys.exit(1)
        
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
        except Exception as e:
            print(f"{RED}Failed to read JSON configuration file: {e}{NC}")
            sys.exit(1)
        
        # Extract hosts from JSON config
        if 'cluster' not in config or 'nodes' not in config['cluster']:
            print(f"{RED}Missing cluster.nodes configuration in JSON file!{NC}")
            sys.exit(1)
        
        hosts = [node['ip'] for node in config['cluster']['nodes']]
        hosts = sorted(set(hosts))
        if not hosts:
            print(f"{RED}Host list is empty!{NC}")
            sys.exit(1)
        admin_node = socket.gethostname()
        if admin_node not in hosts:
            print(f"{YELLOW}Host list does not include current node({admin_node}), adding automatically...{NC}")
            hosts = [admin_node] + hosts

        key_type = "ed25519"
        ssh_port = 22
        user = getpass.getuser()
        tmp_dir = tempfile.mkdtemp(prefix="ssh_trust_")

        # 1. Generate SSH keys
        for host in hosts:
            print(f"{GREEN}[{host}] Generating SSH keys...{NC}")
            if host == admin_node:
                key_path = os.path.expanduser(f"~/.ssh/id_{key_type}")
                if not os.path.exists(key_path):
                    os.system(f'ssh-keygen -t {key_type} -f {key_path} -N "" -q')
            else:
                os.system(f'ssh -p {ssh_port} {user}@{host} "if [ ! -f ~/.ssh/id_{key_type} ]; then ssh-keygen -t {key_type} -f ~/.ssh/id_{key_type} -N \'\' -q; fi"')

        # 2. Collect public keys
        pub_keys = []
        for host in hosts:
            if host == admin_node:
                pub_path = os.path.expanduser(f"~/.ssh/id_{key_type}.pub")
                with open(pub_path, "r") as f:
                    pub_keys.append(f.read().strip())
            else:
                tmp_pub = os.path.join(tmp_dir, f"{host}.pub")
                os.system(f"scp -P {ssh_port} {user}@{host}:~/.ssh/id_{key_type}.pub {tmp_pub} 2>/dev/null")
                if os.path.exists(tmp_pub):
                    with open(tmp_pub, "r") as f:
                        pub_keys.append(f.read().strip())
        all_keys = "\n".join(pub_keys)

        # 3. Distribute authorized_keys and known_hosts
        # Generate known_hosts
        known_hosts_path = os.path.join(tmp_dir, "known_hosts")
        with open(known_hosts_path, "w") as f:
            for host in hosts:
                os.system(f"ssh-keyscan -p {ssh_port} -H {host} >> {known_hosts_path} 2>/dev/null")
        with open(known_hosts_path, "r") as f:
            known_hosts_content = f.read()

        for host in hosts:
            print(f"{GREEN}[{host}] Configuring authorized_keys and known_hosts...{NC}")
            if host == admin_node:
                auth_path = os.path.expanduser("~/.ssh/authorized_keys")
                with open(auth_path, "a") as f:
                    f.write("\n" + all_keys + "\n")
                with open(os.path.expanduser("~/.ssh/known_hosts"), "a") as f:
                    f.write(known_hosts_content)
                os.system("chmod 600 ~/.ssh/authorized_keys")
            else:
                tmp_auth = os.path.join(tmp_dir, "all_keys")
                with open(tmp_auth, "w") as f:
                    f.write(all_keys)
                os.system(f"scp -P {ssh_port} {tmp_auth} {known_hosts_path} {user}@{host}:~/")
                os.system(f'''ssh -p {ssh_port} {user}@{host} "mkdir -p ~/.ssh; cat ~/all_keys >> ~/.ssh/authorized_keys; cat ~/known_hosts >> ~/.ssh/known_hosts; rm -f ~/all_keys ~/known_hosts; chmod 700 ~/.ssh; chmod 600 ~/.ssh/authorized_keys"''')

        # 4. Verify mutual trust
        print(f"{GREEN}Verifying mutual trust between nodes...{NC}")
        for src in hosts:
            for dst in hosts:
                if src == dst:
                    continue
                if src == admin_node:
                    ret = os.system(f"ssh -p {ssh_port} {dst} 'echo -n' &>/dev/null")
                else:
                    ret = os.system(f"ssh -p {ssh_port} {src} \"ssh -p {ssh_port} {dst} 'echo -n'\" &>/dev/null")
                if ret == 0:
                    print(f"[{src} → {dst}] {GREEN}Success{NC}")
                else:
                    print(f"[{src} → {dst}] {RED}Failed{NC}")

        shutil.rmtree(tmp_dir)
        print(f"{GREEN}SSH mutual trust configuration completed for all cluster nodes!{NC}")

class ErrorCode():
    """
    Class to define output about the error message
    """

    def __init__(self):
        pass

    @staticmethod
    def getErrorCodeAsInt(ex, default_error_code):
        """
        Resolve the exit code from the exception instance or error message.

        In linux, the range of return values is between 0 and 255.
        So we can only use each type of error code as exit code.Such as:
            ErrorCode.GAUSS_500 : 10
            ErrorCode.GAUSS_501 : 11

        :param ex:                  Exception instance or error message
        :param default_error_code:  If the exception instance does not contain
        the exit code, use this parameter.

        :type ex:                   Exception | str
        :type default_error_code:   int

        :return:    Return the error code.
            9 represents undefined exit code.
            other number between 0 and 255 represent the specific gauss error.
        :type:      int
        """
        error_message = str(ex)
        pattern = r"^[\S\s]*\[GAUSS-(\d+)\][\S\s]+$"
        match = re.match(pattern, error_message)

        if match is not None and len(match.groups()) == 1:
            error_code = int(match.groups()[0])
        else:
            error_code = default_error_code

        if 50000 < error_code < 60000:
            return error_code // 100 - 500 + 10
        else:
            return 9

    GAUSS_500 = {
        'GAUSS_50000': "[GAUSS-50000] : Unrecognized parameter: %s.",
        'GAUSS_50001': "[GAUSS-50001] : Incorrect parameter. Parameter '-%s' is required",
        'GAUSS_50011': "[GAUSS-50011] : The parameter[%s] value[%s] is invalid.",
        'GAUSS_50024': "[GAUSS-50024] : The parameter [%s] value is invalid.",
    }

    GAUSS_501 = {
        'GAUSS_50100': "[GAUSS-50100] : The %s is not readable for %s.",
        'GAUSS_50104': "[GAUSS-50104] : Only a user with the root permission can run this script.",
        'GAUSS_50105': "[GAUSS-50105] : Cannot run this script as a user with the root permission.",
    }

    GAUSS_502 = {
        'GAUSS_50200': "[GAUSS-50200] : The %s already exists.",
        'GAUSS_50201': "[GAUSS-50201] : The %s does not exist.",
        'GAUSS_50203': "[GAUSS-50203] : The %s cannot be empty.",
        'GAUSS_50204': "[GAUSS-50204] : Failed to read %s.",
        'GAUSS_50205': "[GAUSS-50205] : Failed to write %s.",
        'GAUSS_50206': "[GAUSS-50206] : The %s is a symbolic link.",
        'GAUSS_50208': "[GAUSS-50208] : Failed to create %s.",
        'GAUSS_50209': "[GAUSS-50209] : Failed to access %s.",
        'GAUSS_50210': "[GAUSS-50210] : The %s is not a regular file.",
        'GAUSS_50211': "[GAUSS-50211] : The %s is not a directory.",
        'GAUSS_50213': "[GAUSS-50213] : Failed to parse %s.",
        'GAUSS_50216': "[GAUSS-50216] : Failed to distribute %s.",
        'GAUSS_50219': "[GAUSS-50219] : The %s is invalid.",
        'GAUSS_50230': "[GAUSS-50230] : Failed to read/write %s.",
    }

    GAUSS_503 = {
        'GAUSS_50300': "[GAUSS-50300] : User %s does not exist.",
    }

    GAUSS_506 = {
        'GAUSS_50602': "[GAUSS-50602] : Failed to bind network adapters.",
        'GAUSS_50603': "[GAUSS-50603] : The IP address is invalid.",
    }

    GAUSS_508 = {
        'GAUSS_50801': "[GAUSS-50801] : Failed to set up tasks.",
    }

    GAUSS_511 = {
        'GAUSS_51100': "[GAUSS-51100] : Failed to verify SSH trust on these nodes: %s.",
    }

    GAUSS_512 = {
        'GAUSS_51200': "[GAUSS-51200] : The parameter [%s] in the XML file does not exist.",
        'GAUSS_51230': "[GAUSS-51230] : The number of %s must %s.",
    }

    GAUSS_514 = {
        'GAUSS_51400': "[GAUSS-51400] : Failed to execute command: %s",
    }

    GAUSS_516 = {
        'GAUSS_51600': "[GAUSS-51600] : Failed to check cluster status.",
        'GAUSS_51637': "[GAUSS-51637] : Data directory[%s] is conflicting.",
        'GAUSS_51638': "[GAUSS-51638] : Data directory[%s] is conflicting.",
        'GAUSS_51649': "[GAUSS-51649] : Capture exceptions '%s' : %s.",
        'GAUSS_51650': "[GAUSS-51650] : Unclassified exceptions: %s.",
    }

    GAUSS_518 = {
        'GAUSS_51800': "[GAUSS-51800] : The environmental variable %s is empty. or variable has exceeded maximum length",
        'GAUSS_51802': "[GAUSS-51802] : The environmental variable %s is empty or invalid.",
    }

    GAUSS_532 = {}

class OmError(BaseException):
    """
    Used to record OM exception information and support ErrorCode
    keywords as message information.
    """

    def __init__(self, _message, *args, **kwargs):
        """
        Initialize the OmError instance.

        :param _message:    The input error message, it can be the error
                            message string, or the ErrorCode keywords,
                            or the Exception instance.
        :param args:        The additional unnamed parameters that use
                            to format the error message.
        :param kwargs:      The additional named parameters that use to format
                            the error message or extend to other
                            functions.

        :type _message:     str | BaseException
        :type args:         str | int
        :type kwargs:       str | int
        """
        # If we catch an unhandled exception.
        if isinstance(_message, Exception):
            # Store the error code.
            self._errorCode = ""
            # Store the error message.
            self._message = self.__getErrorMessage(str(_message), args, kwargs)
            # If can not parse the error code.
            if not self._errorCode:
                # Store the error code.
                self._errorCode = "GAUSS_51649"
                # Store the error message.
                self._message = ErrorCode.GAUSS_516[self._errorCode] % (
                    type(_message).__name__, repr(_message))
        else:
            # Store the error code.
            self._errorCode = ""
            # Store the error message.
            self._message = self.__getErrorMessage(_message, args, kwargs)

        # Store the stack information.
        self._stackInfo = sys.exc_info()[2]

    @property
    def message(self):
        """
        Getter, get the error message.

        :return:    Return the error message.
        :rtype:     str
        """
        return self._message

    @property
    def errorCode(self):
        """
        Getter, get the error code.

        :return:    Return the error code.
        :rtype:     str
        """
        return self._errorCode

    def __getErrorMessage(self, _errorCode, args, kwargs):
        """
        Get error information through error code.

        :param _errorCode:  Error code.
        :param args:        Additional parameters.
        :param kwargs:      Additional parameters.

        :type _errorCode:   str
        :type args:         tuple
        :type kwargs:       dict | None

        :return:    Return the error message.
        :rtype:     str
        """
        # Get base error information through error code.
        pattern = r"^[\S\s]*\[(GAUSS-\d+)\][\S\s]+$"
        match = re.match(pattern, str(_errorCode))
        if match and len(match.groups()) == 1:
            self._errorCode = match.groups()[0]
            message = _errorCode
        else:
            self._errorCode = "GAUSS_51650"
            message = ErrorCode.GAUSS_516[self._errorCode] % _errorCode

        # Format parameter which type is "%(param)s".
        if kwargs:
            for key, value in kwargs.items():
                if value is not None:
                    message = message.replace("%(" + key + ")s", str(value))
                else:
                    message = message.replace("%(" + key + ")s", "'None'")

        # Format standard type parameters.
        if args:
            # Convert tuple to list.
            args = list(args)
            # Travel the list.
            for i, arg in enumerate(args):
                if arg is None:
                    args[i] = "'None'"
                else:
                    args[i] = str(arg)

            # Format the message.
            message %= tuple(args)

        return message

    def __str__(self):
        """
        Show this instance as a string.

        :return:    Return this instance as a string.
        :rtype:     str
        """
        return self.message

    def __repr__(self):
        """
        Show this instance as a string.

        :return:    Return this instance as a string.
        :rtype:     str
        """
        return self.__str__()


class InstallImpl:
    def __init__(self, install):
        self.cmpkg = install.cmpkg
        self.context = install
        self.envFile = install.envFile
        self.xmlFile = install.xmlFile
        self.cmDirs = install.cmDirs
        self.hostnames = install.hostnames
        self.gaussHome = install.gaussHome
        self.gaussLog = install.gaussLog
        self.toolPath = install.toolPath
        self.tmpPath = install.tmpPath
        self.localhostName = install.localhostName
        self.logger = install.logger
        self.clusterStopped = install.clusterStopped
        self.primaryTermAbnormal = install.primaryTermAbnormal
        # 初始化 clusterInfo 用于主机名到 IP 的映射
        self.clusterInfo = None
        try:
            self.clusterInfo = dbClusterInfo()
            self.clusterInfo.initFromXml(self.xmlFile)
        except:
            pass  # 如果初始化失败，继续使用原始主机名

    def executeCmdOnHost(self, host, cmd, isLocal = False):
        if host == self.localhostName:
            isLocal = True
        else:
            # 尝试从 clusterInfo 获取对应的 IP 地址
            try:
                if hasattr(self, 'clusterInfo') and self.clusterInfo:
                    for dbNode in self.clusterInfo.dbNodes:
                        if dbNode.name == host and hasattr(dbNode, 'sshIps') and dbNode.sshIps:
                            host = dbNode.sshIps[0]
                            break
            except:
                pass  # 如果获取失败，继续使用原始主机名
        return executeCmdOnHost(host, cmd, isLocal)

    def prepareCMPath(self):
        """
        create path: cmdir、cmdir/cm_server、cmdir/cm_agent
        """
        self.logger.log("Preparing CM path.")
        for (cmdir, host) in zip(self.cmDirs, self.hostnames):
            cmd = "mkdir -p {cmdir}/cm_server {cmdir}/cm_agent".format(cmdir=cmdir)
            status, output = self.executeCmdOnHost(host, cmd)
            if status != 0:
                self.logger.debug("Command: " + cmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit("Failed to create CM path." + errorDetail)

    def decompressCMPkg(self):
        self.logger.log("Decompressing CM pacakage.")
        if self.cmpkg == "":
            return
        # decompress cm pkg on localhost
        decompressCmd = "tar -zxf %s -C %s" % (self.cmpkg, self.gaussHome)
        status, output = subprocess.getstatusoutput(decompressCmd)
        if status != 0:
            self.logger.debug("Command: " + decompressCmd)
            errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
            self.logger.logExit("Failed to decompress cm pacakage to on localhost." + errorDetail)

        # If the version of CM pacakage is inconsistent with that of gaussdb,
        # then exit. So no need to send CM pacakage to other nodes.
        # self.checkCMPkgVersion()

        # decompress cmpkg on other hosts
        cmpkgName = os.path.basename(self.cmpkg)
        for host in self.hostnames:
            if host == self.localhostName:
                continue
            # copy cm pacakage to other hosts
            # 尝试从 clusterInfo 获取对应的 IP 地址
            ssh_ip = host
            try:
                if hasattr(self, 'clusterInfo') and self.clusterInfo:
                    for dbNode in self.clusterInfo.dbNodes:
                        if dbNode.name == host and hasattr(dbNode, 'sshIps') and dbNode.sshIps:
                            ssh_ip = dbNode.sshIps[0]
                            break
            except:
                pass  # 如果获取失败，继续使用原始主机名
            
            if ":" in ssh_ip:
                ssh_ip = "[" + ssh_ip + "]"
            scpCmd = "scp %s %s:%s" % (self.cmpkg, ssh_ip, self.toolPath)
            status, output = subprocess.getstatusoutput(scpCmd)
            if status != 0:
                self.logger.debug("Command: " + scpCmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit(("Failed to send cm pacakage to %s." % ssh_ip) + errorDetail)
            pkgPath = os.path.join(self.toolPath, cmpkgName)
            decompressCmd = "tar -zxf %s -C %s" % (pkgPath, self.gaussHome)
            status, output = self.executeCmdOnHost(host, decompressCmd)
            if status != 0:
                self.logger.debug("Command: " + decompressCmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit(("Failed to decompress cm pacakage to on host %s." % host) + errorDetail)

    def checkCMPkgVersion(self):
        getCMVersionCmd = "source %s; cm_ctl -V" % self.envFile
        status, output = subprocess.getstatusoutput(getCMVersionCmd)
        if status != 0:
            self.logger.logExit("Failed to get CM pacakage version.")
        cmVersionList = re.findall(r'.*CM (\d.*\d) build', output)
        if len(cmVersionList) == 0:
            self.logger.logExit("Failed to get CM pacakage version.")
        cmVersion = cmVersionList[0]

        # getGaussdbVersionCmd = "source %s; gaussdb -V" % self.envFile
        # status, output = subprocess.getstatusoutput(getGaussdbVersionCmd)
        # if status != 0:
        #     self.logger.logExit("Failed to get gaussdb version.")
        # gaussdbVersionList = re.findall(r'openGauss (\d.*\d) build', output)
        # if len(gaussdbVersionList) == 0:
        #     self.logger.logExit("Failed to get gaussdb version.")
        # gaussdbVersion = gaussdbVersionList[0]

        # if gaussdbVersion != cmVersion:
        #     self.logger.logExit("The version of CM pacakage(%s) is inconsistent "
        #         "with that of gaussdb(%s)." % (cmVersion, gaussdbVersion))

    def createManualStartFile(self):
        self.logger.log("Creating cluster_manual_start file.")
        cmd = """
            if [ ! -f {gaussHome}/bin/cluster_manual_start ]; then
                touch {gaussHome}/bin/cluster_manual_start
            fi
            """.format(gaussHome=self.gaussHome)
        for host in self.hostnames:
            status, output = self.executeCmdOnHost(host, cmd)
            if status != 0:
                self.logger.debug("Command: " + cmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit("Failed to create cluster_manual_start file." + errorDetail)

    def initCMServer(self):
        self.logger.log("Initializing cm_server.")
        for (cmdir, host) in zip(self.cmDirs, self.hostnames):
            # 构建磁盘文件路径
            sharedisk_path = os.path.join(self.clusterInfo.wormPath, "sharedisk")
            votingdisk_path = os.path.join(self.clusterInfo.wormPath, "votingdisk")
            
            cmd = """
                cp {gaussHome}/share/config/cm_server.conf.sample {cmdir}/cm_server/cm_server.conf
                sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_server#' {cmdir}/cm_server/cm_server.conf -i
                sed 's#ddb_type = .*#ddb_type = 2#' {cmdir}/cm_server/cm_server.conf -i
                sed 's#dn_arbitrate_mode = .*#dn_arbitrate_mode = share_disk#' {cmdir}/cm_server/cm_server.conf -i
                sed 's#share_disk_path = .*#share_disk_path = \'{sharedisk_path}\'#' {cmdir}/cm_server/cm_server.conf -i
                sed 's#voting_disk_path = .*#voting_disk_path = \'{votingdisk_path}\'#' {cmdir}/cm_server/cm_server.conf -i
                sed 's#disk_timeout = .*#disk_timeout = 6#' {cmdir}/cm_server/cm_server.conf -i
                """.format(gaussHome=self.gaussHome, gaussLog=self.gaussLog, cmdir=cmdir, 
                          sharedisk_path=sharedisk_path, votingdisk_path=votingdisk_path)
            status, output = self.executeCmdOnHost(host, cmd)
            if status != 0:
                self.logger.debug("Command: " + cmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit("Failed to initialize cm_server." + errorDetail)

    def initCMAgent(self):
        self.logger.log("Initializing cm_agent.")
        for (cmdir, host) in zip(self.cmDirs, self.hostnames):
            # 构建投票磁盘文件路径
            votingdisk_path = os.path.join(self.clusterInfo.wormPath, "votingdisk")
            
            cmd = """
                cp {gaussHome}/share/config/cm_agent.conf.sample {cmdir}/cm_agent/cm_agent.conf && 
                sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_agent#' {cmdir}/cm_agent/cm_agent.conf -i && 
                sed 's#unix_socket_directory = .*#unix_socket_directory = {gaussHome}#' {cmdir}/cm_agent/cm_agent.conf -i &&
                sed 's#voting_disk_path = .*#voting_disk_path = \'{votingdisk_path}\'#' {cmdir}/cm_agent/cm_agent.conf -i &&
                sed 's#disk_timeout = .*#disk_timeout = 6#' {cmdir}/cm_agent/cm_agent.conf -i
                """.format(gaussHome=self.gaussHome, gaussLog=self.gaussLog, cmdir=cmdir, 
                          votingdisk_path=votingdisk_path)
            status, output = self.executeCmdOnHost(host, cmd)
            if status != 0:
                self.logger.debug("Command: " + cmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit("Failed to initialize cm_agent." + errorDetail)

    def AddGrResource(self):
        """
        Add gr Resource
        """
        self.logger.log("Add gr resource.")
        cmd = "source %s ; sh %s/gr_res.sh" % (self.envFile, self.toolPath)
        for host in self.hostnames:
            status, output = self.executeCmdOnHost(host, cmd)
            if status != 0:
                self.logger.debug("Command: " + cmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit(("Failed to add gr resource to %s." % host) + errorDetail)

    def setMonitorCrontab(self):
        """
        set om_monitor crontab
        """
        self.logger.log("Setting om_monitor crontab.")
        # save old crontab content to cronContentTmpFile
        cronContentTmpFile = os.path.join(self.tmpPath, "cronContentTmpFile_" + str(os.getpid()))
        listCronCmd = "crontab -l > %s" % cronContentTmpFile
        status, output = self.executeCmdOnHost(self.localhostName, listCronCmd)
        is_no_crontab = ("no crontab" in output.lower() and status == 1)
        if status != 0 and not is_no_crontab:
            self.logger.debug("Command: " + listCronCmd)
            errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
            self.logger.logExit(ErrorCode.GAUSS_508["GAUSS_50804"] + errorDetail)
        # if old crontab content contains om_monitor, clear it
        clearMonitorCmd = "sed '/.*om_monitor.*/d' %s -i" % cronContentTmpFile
        status, output = subprocess.getstatusoutput(clearMonitorCmd)
        if status != 0:
            os.remove(cronContentTmpFile)
            self.logger.debug("Command: " + clearMonitorCmd)
            errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
            self.logger.logExit("Failed to clear old om_monitor crontab." + errorDetail)

        # generate om_monitor crontab command and append it to cronContentTmpFile
        startMonitorCmd = "source /etc/profile;(if [ -f ~/.profile ];" \
                      "then source ~/.profile;fi);source ~/.bashrc;"
        if self.envFile != "~/.bashrc":
            startMonitorCmd += "source %s; " % (self.envFile)
        monitorLogPath = os.path.join(self.gaussLog, "cm")
        if not os.path.exists(monitorLogPath):
            os.makedirs(monitorLogPath)
        startMonitorCmd += "nohup om_monitor -L %s/om_monitor >>/dev/null 2>&1 &" % monitorLogPath
        monitorCron = "*/1 * * * * " + startMonitorCmd + os.linesep
        with open(cronContentTmpFile, 'a+', encoding='utf-8') as fp:
            fp.writelines(monitorCron)
            fp.flush()

        # set crontab on other hosts
        setCronCmd = "crontab %s" % cronContentTmpFile
        cleanTmpFileCmd = "rm %s -f" % cronContentTmpFile
        username = getpass.getuser()
        killMonitorCmd = "pkill om_monitor -u %s; " % username
        for host in self.hostnames:
            if host == self.localhostName:
                continue
            # copy cronContentTmpFile to other host
            # 尝试从 clusterInfo 获取对应的 IP 地址
            ssh_ip = host
            try:
                if hasattr(self, 'clusterInfo') and self.clusterInfo:
                    for dbNode in self.clusterInfo.dbNodes:
                        if dbNode.name == host and hasattr(dbNode, 'sshIps') and dbNode.sshIps:
                            ssh_ip = dbNode.sshIps[0]
                            break
            except:
                pass  # 如果获取失败，继续使用原始主机名
            
            if ":" in ssh_ip:
                ssh_ip = "[" + ssh_ip + "]"
            scpCmd = "scp %s %s:%s" % (cronContentTmpFile, ssh_ip, self.tmpPath)
            status, output = subprocess.getstatusoutput(scpCmd)
            if status != 0:
                self.logger.debug("Command: " + scpCmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit(("Failed to copy cronContentTmpFile to %s." % ssh_ip) + errorDetail)
            # set om_monitor crontab
            status, output = self.executeCmdOnHost(host, setCronCmd)
            # cleanup cronContentTmpFile
            self.executeCmdOnHost(host, cleanTmpFileCmd)
            if status != 0:
                self.logger.debug("Command: " + setCronCmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit(ErrorCode.GAUSS_508["GAUSS_50801"] + errorDetail)

            # start om_monitor
            # Firstly, kill residual om_monitor, otherwise cm_agent won't be started if there are residual om_monitor process.
            status, output = self.executeCmdOnHost(host, killMonitorCmd + startMonitorCmd)
            if status != 0:
                self.logger.debug("Command: " + startMonitorCmd)
                errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
                self.logger.logExit((ErrorCode.GAUSS_516["GAUSS_51607"] % "om_monitor") + errorDetail)

        # set crontab on localhost
        status, output = subprocess.getstatusoutput(setCronCmd)
        os.remove(cronContentTmpFile)
        if status != 0:
            self.logger.debug("Command: " + setCronCmd)
            errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
            self.logger.logExit(ErrorCode.GAUSS_508["GAUSS_50801"] + errorDetail)

        status, output = subprocess.getstatusoutput(killMonitorCmd + startMonitorCmd)
        if status != 0:
            self.logger.debug("Command: " + startMonitorCmd)
            errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
            self.logger.logExit((ErrorCode.GAUSS_516["GAUSS_51607"] % "om_monitor") + errorDetail)

    def startCluster(self):
        self.logger.log("Starting cluster.")
        startCmd = "source %s; cm_ctl start" % self.envFile
        status, output = subprocess.getstatusoutput(startCmd)
        if status != 0:
            self.logger.debug("Command: " + startCmd)
            errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
            self.logger.logExit("Failed to start cluster." + errorDetail)

        # status, output = InstallImpl.refreshDynamicFile(self.envFile)
        # if status != 0:
        #     self.logger.error("Failed to refresh dynamic file." + output)

        queryCmd = "source %s; cm_ctl query -Cv" % self.envFile
        status, output = subprocess.getstatusoutput(queryCmd)
        if status != 0:
            self.logger.debug("Command: " + queryCmd)
            errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
            self.logger.logExit("Failed to query cluster status." + errorDetail)
        self.logger.log(output)
        self.logger.log("Install CM tool success.")
        if self.primaryTermAbnormal:
            self.logger.warn("Term of primary is invalid or not maximal.\n"
                "Hint: To avoid CM arbitration anomalies in this situation, "
                "please restart the database.\n"
                "Command : cm_ctl stop && cm_ctl start")

    @staticmethod
    def refreshStaticFile(envFile, xmlFile):
        """
        refresh static and dynamic file using xml file with cm
        """
        # refresh static file
        cmd = """
            source {envFile};
            gr_om -t generateconf -X {xmlFile} --distribute
            """.format(envFile=envFile, xmlFile=xmlFile)
        status, output = subprocess.getstatusoutput(cmd)
        errorDetail = ""
        if status != 0:
            errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s" % (cmd, status, output)
        return status, errorDetail

    @staticmethod
    def refreshDynamicFile(envFile):
        # refresh dynamic file
        refreshDynamicFileCmd = "source %s; gr_om -t refreshconf" % envFile
        status, output = subprocess.getstatusoutput(refreshDynamicFileCmd)
        errorDetail = ""
        if status != 0:
            errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s" % (refreshDynamicFileCmd, status, output)
        return status, errorDetail

    @staticmethod
    def checkPassword(passwordCA):
        minPasswordLen = 8
        maxPasswordLen = 15
        kinds = [0, 0, 0, 0]
        specLetters = "~!@#$%^&*()-_=+\\|[{}];:,<.>/?"
        if len(passwordCA) < minPasswordLen:
            print("Invalid password, it must contain at least eight characters.")
            return False
        if len(passwordCA) > maxPasswordLen:
            print("Invalid password, it must contain at most fifteen characters.")
            return False
        for c in passwordCA:
            if isdigit(c):
                kinds[0] += 1
            elif isupper(c):
                kinds[1] += 1
            elif islower(c):
                kinds[2] += 1
            elif c in specLetters:
                kinds[3] += 1
            else:
                print("The password contains illegal character: %s." % c)
                return False
        kindsNum = 0
        for k in kinds:
            if k > 0:
                kindsNum += 1
        if kindsNum < 3:
            print("The password must contain at least three kinds of characters.")
            return False
        return True

    def _getPassword(self):
        passwordCA = ""
        passwordCA2 = ""
        tryCount = 0
        while tryCount < 3:
            passwordCA = getpass.getpass("Please input the password for ca cert:")
            passwordCA2 = getpass.getpass("Please input the password for ca cert again:")
            if passwordCA != passwordCA2:
                tryCount += 1
                self.logger.printMessage("The password enterd twice do not match.")
                continue
            if not InstallImpl.checkPassword(passwordCA):
                tryCount += 1
                continue
            break
        if tryCount == 3:
            self.logger.logExit("Maximum number of attempts has been reached.")
        return passwordCA

    def _createCMSslConf(self, certPath):
        """
        Generate config file.
        """
        self.logger.debug("OPENSSL: Create config file.")
        v3CaL = [
            "[ v3_ca ]",
            "subjectKeyIdentifier=hash",
            "authorityKeyIdentifier=keyid:always,issuer:always",
            "basicConstraints = CA:true",
            "keyUsage = keyCertSign,cRLSign",
        ]
        v3Ca = os.linesep.join(v3CaL)

        # Create config file.
        with open(os.path.join(certPath, "openssl.cnf"), "w") as fp:
            # Write config item of Signature
            fp.write(v3Ca)
        self.logger.debug("OPENSSL: Successfully create config file.")

    def _cleanUselessFile(self):
        """
        Clean useless files
        :return: NA
        """
        certPath = os.path.join(self.gaussHome, "share/sslcert/cm")
        keyFiles = ["cacert.pem", "server.crt", "server.key", "client.crt", "client.key",
            "server.key.cipher", "server.key.rand", "client.key.cipher", "client.key.rand"]
        for fileName in os.listdir(certPath):
            filePath = os.path.join(certPath, fileName)
            if fileName not in keyFiles:
                os.remove(filePath)

    def _createCMCALocal(self):
        self.logger.debug("Creating Cm ca files locally.")

        if 'OPENSSL_CONF' in os.environ:
            del os.environ['OPENSSL_CONF']
        os.environ['OPENSSL_CONF'] = '/etc/pki/tls/openssl.cnf'
        certPath = os.path.join(self.gaussHome, "share/sslcert/cm")
        mkdirCmd = f"rm -rf {certPath}; mkdir -p {certPath}"
        status, output = subprocess.getstatusoutput(mkdirCmd)
        if status != 0:
            self.logger.debug(f"Command: {mkdirCmd}\nStatus: {status}\nOutput: {output}")
            self.logger.logExit("Failed to create cert path.")
        self._createCMSslConf(certPath)
        passwd = self._getPassword()

        activePeriod = "10950"
        opensslConf = os.path.join(certPath, "openssl.cnf")
        if not os.path.isfile(opensslConf):
            self.logger.logExit("CM ssl conf does not exist.")

        # 生成 cakey.pem
        gen_cakey_cmd = f'echo "{passwd}" | openssl genrsa -aes256 -f4 -passout stdin -out {certPath}/cakey.pem 2048'
        status, output = subprocess.getstatusoutput(gen_cakey_cmd)
        if status != 0:
            self.logger.logExit("Failed to generate cakey.pem.\n" + output)

        # 生成 cacert.pem
        gen_cacert_cmd = f'echo "{passwd}" | openssl req -new -x509 -passin stdin -days {activePeriod} -key {certPath}/cakey.pem -out {certPath}/cacert.pem -subj "/C=CN/ST=NULL/L=NULL/O=NULL/OU=NULL/CN=CA"'
        status, output = subprocess.getstatusoutput(gen_cacert_cmd)
        if status != 0:
            self.logger.logExit("Failed to generate cacert.pem.\n" + output)

        for role in ["server", "client"]:
            # 生成 key
            gen_key_cmd = f'echo "{passwd}" | openssl genrsa -aes256 -passout stdin -out {certPath}/{role}.key 2048'
            status, output = subprocess.getstatusoutput(gen_key_cmd)
            if status != 0:
                self.logger.logExit(f"Failed to generate {role}.key.\n" + output)
            # 生成 csr
            gen_csr_cmd = f'echo "{passwd}" | openssl req -new -key {certPath}/{role}.key -passin stdin -out {certPath}/{role}.csr -subj "/C=CN/ST=NULL/L=NULL/O=NULL/OU=NULL/CN={role}"'
            status, output = subprocess.getstatusoutput(gen_csr_cmd)
            if status != 0:
                self.logger.logExit(f"Failed to generate {role}.csr.\n" + output)
            # 生成 crt
            gen_crt_cmd = f'echo "{passwd}" | openssl x509 -req -days {activePeriod} -in {certPath}/{role}.csr -CA {certPath}/cacert.pem -CAkey {certPath}/cakey.pem -passin stdin -CAcreateserial -out {certPath}/{role}.crt -extfile {certPath}/openssl.cnf'
            status, output = subprocess.getstatusoutput(gen_crt_cmd)
            if status != 0:
                self.logger.logExit(f"Failed to generate {role}.crt.\n" + output)
            # 删除 csr 文件
            rm_csr_cmd = f'rm -f {certPath}/{role}.csr'
            subprocess.getstatusoutput(rm_csr_cmd)

        # 生成 server cipher 和 rand
        expect_server_cmd = (
            f'expect -c \'spawn cm_ctl encrypt -M server -D {certPath}; '
            f'expect "*password*" {{ send "{passwd}\\r"; exp_continue }}\''
        )
        status, output = subprocess.getstatusoutput(expect_server_cmd)
        if status != 0:
            self.logger.logExit("Failed to encrypt server key.\n" + output)
        
        # 验证生成的文件是否存在
        server_rand_file = os.path.join(certPath, "server.key.rand")
        server_cipher_file = os.path.join(certPath, "server.key.cipher")
        if not os.path.exists(server_rand_file):
            self.logger.logExit("Failed to generate server.key.rand.\n" + output)
        if not os.path.exists(server_cipher_file):
            self.logger.logExit("Failed to generate server.key.cipher.\n" + output)

        # 生成 client cipher 和 rand
        expect_client_cmd = (
            f'expect -c \'spawn cm_ctl encrypt -M client -D {certPath}; '
            f'expect "*password*" {{ send "{passwd}\\r"; exp_continue }}\''
        )
        status, output = subprocess.getstatusoutput(expect_client_cmd)
        if status != 0:
            self.logger.logExit("Failed to encrypt client key.\n" + output)
        
        # 验证生成的文件是否存在
        client_rand_file = os.path.join(certPath, "client.key.rand")
        client_cipher_file = os.path.join(certPath, "client.key.cipher")
        if not os.path.exists(client_rand_file):
            self.logger.logExit("Failed to generate client.key.rand.\n" + output)
        if not os.path.exists(client_cipher_file):
            self.logger.logExit("Failed to generate client.key.cipher.\n" + output)

        # 密码置空
        passwd = ""
        del passwd

        # 设置只读权限
        chmod_cmd = f'chmod 400 {certPath}/*'
        status, output = subprocess.getstatusoutput(chmod_cmd)
        if status != 0:
            self.logger.logExit("Failed to set readonly for cert files.\n" + output)

        self._cleanUselessFile()

    def _distributeCA(self):
        self.logger.debug("Distributing CM ca files to other hosts.")
        certPath = os.path.join(self.gaussHome, "share/sslcert/cm")
        createCertPathCmd = "rm {certPath} -rf; mkdir -p {certPath}; chmod 700 {certPath}".format(
            certPath=certPath)
        for host in self.hostnames:
            if host == self.localhostName:
                continue
            status, output = self.executeCmdOnHost(host, createCertPathCmd)
            if status != 0:
                errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s" % (createCertPathCmd, status, output)
                self.logger.debug(errorDetail)
                self.logger.logExit("Failed to create path of CA for CM on host %s." % host)
            # 尝试从 clusterInfo 获取对应的 IP 地址
            ssh_ip = host
            try:
                if hasattr(self, 'clusterInfo') and self.clusterInfo:
                    for dbNode in self.clusterInfo.dbNodes:
                        if dbNode.name == host and hasattr(dbNode, 'sshIps') and dbNode.sshIps:
                            ssh_ip = dbNode.sshIps[0]
                            break
            except:
                pass  # 如果获取失败，继续使用原始主机名
            
            # Determine if the host is an IPv6 address and format accordingly
            if ":" in ssh_ip:
                formatted_host = "[{}]".format(ssh_ip)
            else:
                formatted_host = ssh_ip
        
            # Create the scp command with the formatted host
            scpCmd = "scp {certPath}/* {host}:{certPath}".format(certPath=certPath, host=formatted_host)
            status, output = subprocess.getstatusoutput(scpCmd)
            if status != 0:
                errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s" % (scpCmd, status, output)
                self.logger.debug(errorDetail)
                self.logger.logExit("Failed to create CA for CM.")

    def createCMCA(self):
        self.logger.log("Creating CM ca files.")
        self._createCMCALocal()
        self._distributeCA()

    def run(self):
        self.logger.log("Start to install cm tool.")
        self.prepareCMPath()
        self.decompressCMPkg()
        self.createManualStartFile()
        self.initCMServer()
        self.initCMAgent()
        self.createCMCA()
        self.setMonitorCrontab()
        self.AddGrResource()
        self.startCluster()
class Install:
    """
    The class is used to do perform installation
    """

    def __init__(self):
        self.envFile = ""
        self.xmlFile = ""
        self.gaussHome = ""
        self.gaussLog = ""
        self.toolPath = ""
        self.tmpPath = ""
        self.cmDirs = []
        self.hostnames = []
        self.localhostName = ""
        self.cmpkg = ""
        self.nodesInfo = dict()
        self.clusterStopped = False
        self.maxTerm = 0
        self.primaryTermAbnormal = False
        self.primary = ""

    def getLocalhostName(self):
        self.localhostName = socket.gethostname()

    def getEnvParams(self):
        self.gaussHome = getEnvParam(self.envFile, "GAUSSHOME")
        self.gaussLog = getEnvParam(self.envFile, "GAUSSLOG")
        self.toolPath = getEnvParam(self.envFile, "GPHOME")
        self.tmpPath = getEnvParam(self.envFile, "PGHOST")

    def checkExeUser(self):
        if os.getuid() == 0:
            CMLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50105"])

    def usage(self):
        """
cm_install is a utility to deploy CM tool to openGauss database cluster.

Usage:
    cm_install -? | --help
    cm_install -X XMLFILE [-e envFile] --cmpkg=cmpkgPath
General options:
    -X                                 Path of the XML configuration file.
    -e                                 Path of env file.
                                       Default value "~/.bashrc".
    --cmpkg                            Path of CM pacakage.
    -?, --help                         Show help information for this
                                       utility, and exit the command line mode.
        """
        print(self.usage.__doc__)

    def parseCommandLine(self):
        if len(sys.argv) == 1:
            self.usage()
            sys.exit(1)

        try:
            opts, args = getopt.getopt(sys.argv[1:], "?X:e:", ["help", "cmpkg="])
        except getopt.GetoptError as e:
            CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(e))

        for opt, value in opts:
            if opt in ("-?", "--help"):
                self.usage()
                sys.exit(0)
            elif opt in ("-X"):
                self.xmlFile = value
            elif opt in ("-e"):
                self.envFile = value
            elif opt in ("--cmpkg"):
                self.cmpkg = value

    def checkParam(self):
        if self.xmlFile == "":
            CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % 'X' + ".")
        checkXMLFile(self.xmlFile)

        if self.cmpkg == "":
            CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % '-cmpkg' + ".")
        if not os.path.exists(self.cmpkg):
            CMLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50201"] % self.cmpkg)
        if not os.path.isfile(self.cmpkg):
            CMLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50210"] % ("cmpkg " + self.cmpkg))

        if self.envFile == "":
            self.envFile = os.path.join(os.environ['HOME'], ".bashrc")
        if not os.path.exists(self.envFile):
            CMLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50201"] % ("envFile " + self.envFile))
        if not os.path.isfile(self.envFile):
            CMLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50210"] % ("envFile " + self.envFile))
        mppdbEnv = getEnvParam(self.envFile, "MPPDB_ENV_SEPARATE_PATH")
        if mppdbEnv != "":
            self.envFile = mppdbEnv
        if self.envFile == "" or not os.path.exists(self.envFile) or not os.path.isfile(self.envFile):
            CMLog.exitWithError(ErrorCode.GAUSS_518["GAUSS_51802"] % 'MPPDB_ENV_SEPARATE_PATH' + ".")

    def checkOm(self):
        """
        check whether there is om tool
        """
        cmd = "source %s; gr_om --version" % self.envFile
        status, output = subprocess.getstatusoutput(cmd)
        if status != 0:
            errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s\n" % (
                cmd, status, output)
            self.logger.logExit("OM tool is required." + errorDetail)

    def checkXMLFileSecurity(self):
        """
        function : check XML contain DTDs
        input : String
        output : NA
        """
        # Check xml for security requirements
        # if it have "<!DOCTYPE" or it have "<!ENTITY",
        # exit and print "File have security risks."
        try:
            with open(self.xmlFile, "r", encoding='utf-8') as fb:
                lines = fb.readlines()
            for line in lines:
                if re.findall("<!DOCTYPE", line) or re.findall("<!ENTITY", line):
                    raise Exception("File have security risks.")
        except Exception as e:
            raise Exception(str(e))

    def initParserXMLFile(self):
        """
        function : Init parser xml file
        input : String
        output : Object
        """
        try:
            # check xml for security requirements
            self.checkXMLFileSecurity()
            dom_tree = ETree.parse(self.xmlFile)
            rootNode = dom_tree.getroot()
        except Exception as e:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51236"] + " Error: \n%s." % str(e))
        return rootNode

    def generateXml(self):
        """
        function : generate xml file
        input : String
        output : Object
        """
        cmd = "source %s; ./gr_om generate-xml %s" % (self.envFile, self.xmlFile)
        status, output = subprocess.getstatusoutput(cmd)
        if status != 0:
            self.logger.logExit((ErrorCode.GAUSS_514["GAUSS_51400"] % cmd) + f"\nStatus:{status}\nOutput:{output}")

    def getInfoListOfAllNodes(self):
        """
        get hostname and cmDir list of all nodes
        check other CM infos in xml
        TODO: check the consistence of xml and installed cluster.
        """
        self.localhostName = getLocalhostName()

        # get hostnames and port from static file
        cmd = "source %s; ./gr_om view %s" % (self.envFile, self.xmlFile)
        status, output = subprocess.getstatusoutput(cmd)
        if status != 0:
            self.logger.logExit((ErrorCode.GAUSS_514["GAUSS_51400"] % cmd) + \
                f"\nStatus:{status}\nOutput:{output}")
        nodesStaticInfoStr = re.split("azName.*:.*", output)
        if len(nodesStaticInfoStr) == 0:
            self.logger.logExit("Failed to get cluster info from static file.")
        if len(nodesStaticInfoStr) < 2:
            self.logger.logExit("CM is not supported in single instance.")
        nodesStaticInfo = nodesStaticInfoStr[1:]
        for nodeInfo in nodesStaticInfo:
            if nodeInfo == "":
                continue
            
            # Parse node name with error checking
            nodename_match = re.findall("nodeName:(.*)", nodeInfo)
            if not nodename_match:
                self.logger.logExit(f"Failed to parse nodeName from nodeInfo: {nodeInfo}")
            nodename = nodename_match[0]
            self.hostnames.append(nodename)
            
            # Parse data path with error checking
            dataPath_match = re.findall("datanodeLocalDataPath.*:(.*)", nodeInfo)
            if not dataPath_match:
                self.logger.logExit(f"Failed to parse datanodeLocalDataPath from nodeInfo: {nodeInfo}")
            dataPath = dataPath_match[0]
            
            # Parse port with error checking
            port_match = re.findall("datanodePort.*:(.*)", nodeInfo)
            if not port_match:
                self.logger.logExit(f"Failed to parse datanodePort from nodeInfo: {nodeInfo}")
            port = port_match[0]
            
            self.nodesInfo[nodename] = {"dataPath": dataPath, "port": port}

        # get node info from XML
        hostnamesInXML = []
        rootNode = self.initParserXMLFile()
        elementName = 'DEVICELIST'
        if not rootNode.findall('DEVICELIST'):
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"] % elementName)
        deviceArray = rootNode.findall('DEVICELIST')[0]
        deviceNodes = deviceArray.findall('DEVICE')
        cmDict = {"cmsNum": "", "cmServerPortBase": "", "cmServerPortStandby": "",
             "cmServerlevel": "", "cmServerListenIp1": "", "cmServerRelation": ""}
        for dev in deviceNodes:
            paramList = dev.findall('PARAM')
            for param in paramList:
                paraName = param.attrib['name']
                paraValue = param.attrib['value']
                if paraName == 'name':
                    hostnamesInXML.append(paraValue)
                elif paraName == 'cmDir':
                    self.cmDirs.append(paraValue)
                elif paraName == 'cmServerLevel':
                    cmDict['cmServerlevel'] = paraValue
                elif paraName in cmDict.keys():
                    cmDict[paraName] = paraValue
        # check whether XML contains all nodes info
        if self.hostnames != hostnamesInXML:
            self.logger.logExit("XML info is not consistent with static file.")
        # check params in xml
        for item in cmDict:
            if item == 'cmServerPortStandby':
                continue
            if cmDict[item] == "":
                self.logger.logExit(ErrorCode.GAUSS_512["GAUSS_51200"] % item)
        if cmDict['cmsNum'] != '1':
            self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmsNum')
        if cmDict['cmServerlevel'] != '1':
            self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerlevel')
        if not cmDict['cmServerPortBase'].isdigit():
            self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortBase')
        if cmDict['cmServerPortStandby'] != "" and not cmDict['cmServerPortStandby'].isdigit():
            self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortStandby')
        if len(self.hostnames) != len(self.cmDirs):
            self.logger.logExit("\"cmDir\" of all nodes must be provided.")

    def checkHostTrust(self):
        checkHostsTrust(self.hostnames)

    def initLogger(self):
        logPath = os.path.join(self.gaussLog, "cm", "cm_tool")
        if not os.path.exists(logPath):
            os.makedirs(logPath)
        self.logger = CMLog(logPath, "cm_install", "cm_install")

    def checkCM(self):
        """
        Check whether there is CM in current cluster.
        """
        checkCMExistCmd = "source %s; cm_ctl query -Cv | " \
            "grep 'CMServer State' > /dev/null" % self.envFile
        status, output = subprocess.getstatusoutput(checkCMExistCmd)
        if status == 0:
            self.logger.logExit("CM exists in current cluster.")

    def checkCluster(self):
        """
        check the status of the current cluster
        """
        cmd = "source %s; gr_om -t status --detail" % self.envFile
        status, output = subprocess.getstatusoutput(cmd)
        if status != 0:
            erroeDetail = "Detail:\nCommand:\n" + cmd + "\noutput:" + output
            self.logger.logExit(ErrorCode.GAUSS_516["GAUSS_51600"] + erroeDetail)
        if "cluster_state   : Unavailable" in output:
            # It’s permitted to deploy CM tool when cluster is stopped,
            # but not permitted when cluster is unavailable.
            if output.count("Manually stopped") == len(self.hostnames):
                self.clusterStopped = True
                return
            self.logger.logExit("The cluster is unavailable currently.")
        if "cluster_state   : Normal" not in output:
            self.logger.logExit("Cluster is running but its status is abnormal.")
        # check whether term of primary is invalid and biggest.
        primaryCount = 0
        primaryTerm = 0
        sqlCmd = "select term from pg_last_xlog_replay_location();"
        for host in self.hostnames:
            isLocal = False
            if host == self.localhostName:
                isLocal = True
            findPrimaryCmd = "source %s; gs_ctl query -D %s | grep -i 'local_role.*Primary' > /dev/null" % \
                (self.envFile, self.nodesInfo[host]["dataPath"])
            notPrimary, output = executeCmdOnHost(host, findPrimaryCmd, isLocal)
            if notPrimary == 0:
                primaryCount += 1
            getTermLsnCmd = "source %s; gsql -d postgres -p %s -tA -c '%s'" % \
                (self.envFile, self.nodesInfo[host]["port"], sqlCmd)
            status, term = executeCmdOnHost(host, getTermLsnCmd, isLocal)
            if status != 0:
                self.logger.logExit("Failed to get term of host %s." % host)
            if notPrimary == 0:
                primaryTerm = int(term)
            if self.maxTerm < int(term):
                self.maxTerm = int(term)

        if primaryCount != 1:
            self.logger.logExit("The number of primary is invalid.")
        if primaryTerm == 0 or primaryTerm < self.maxTerm:
            self.primaryTermAbnormal = True
            self.logger.warn("Term of primary is invalid or not maximal.\n"
                "Hint: it seems that the cluster is newly installed, so it's "
                "recommended to deploy CM tool while installing the cluster.")

    def run(self):
        self.checkExeUser()
        self.parseCommandLine()
        self.checkParam()
        self.initLogger()  # 提前初始化 logger
        self.generateXml()
        self.getEnvParams()
        self.checkCM()
        self.getInfoListOfAllNodes()
        self.getLocalhostName()
        # self.checkHostTrust()
        
        # 设置全局主机名到 IP 的映射
        try:
            clusterInfo = dbClusterInfo()
            clusterInfo.initFromXml(self.xmlFile)
            hostname_ip_map = {}
            for dbNode in clusterInfo.dbNodes:
                if hasattr(dbNode, 'sshIps') and dbNode.sshIps:
                    hostname_ip_map[dbNode.name] = dbNode.sshIps[0]
            set_hostname_ip_map(hostname_ip_map)
        except:
            pass  # 如果设置失败，继续使用原始主机名
        
        installImpl = InstallImpl(self)
        installImpl.run()

####################################################################
##read cluster functions
####################################################################

def checkPathVaild(obtainpath):
    """
    function: check path vaild
    input : envValue
    output: NA
    """
    PATH_CHECK_LIST = [" ", "|", ";", "&", "$", "<", ">", "`", "\\", "'", "\"",
                       "{", "}", "(", ")", "[", "]", "~", "*", "?", "!", "\n"]
    if not obtainpath.strip():
        return
    if any(ill_char in obtainpath for ill_char in PATH_CHECK_LIST):
        raise Exception(ErrorCode.GAUSS_502["GAUSS_50219"] % obtainpath +
                        " There are illegal characters in the path.")

def getEnvParam(envFile, param):
    """
    Get environment parameter value from environment file.
    
    Args:
        envFile (str): Path to the environment file
        param (str): Parameter name to retrieve
        
    Returns:
        str: Parameter value
    """
    cmd = "source {envFile}; echo ${param}".format(envFile=envFile, param=param)
    status, output = subprocess.getstatusoutput(cmd)
    if status != 0:
        errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s\n" % (
            cmd, status, output)
        CMLog.exitWithError(ErrorCode.GAUSS_518["GAUSS_51802"] % param)
    return output

def getLocalhostName():
    """
    Get the local hostname.
    
    Returns:
        str: Local hostname
    """
    return socket.gethostname()

# 全局主机名到 IP 的映射字典
_hostname_to_ip_map = {}

def set_hostname_ip_map(hostname_ip_map):
    """
    Set global hostname to IP mapping.
    
    Args:
        hostname_ip_map (dict): Dictionary mapping hostnames to IP addresses
    """
    global _hostname_to_ip_map
    _hostname_to_ip_map = hostname_ip_map

def executeCmdOnHost(host, cmd, isLocal=False):
    """
    Execute command on a remote host via SSH.
    
    Args:
        host (str): Target hostname or IP address
        cmd (str): Command to execute
        isLocal (bool): Whether to execute locally (default: False)
        
    Returns:
        tuple: (status, output) - command exit status and output
    """
    if not isLocal:
        # Try to get IP address from global mapping
        if host in _hostname_to_ip_map:
            host = _hostname_to_ip_map[host]
        cmd = 'ssh -q -o ConnectTimeout=5 %s \"%s\"' % (host, cmd)
    status, output = subprocess.getstatusoutput(cmd)
    return status, output

def checkXMLFile(xmlFile):
    """
    Check XML file validity.
    
    Performs the following checks:
    1. Check whether XML file exists
    2. Check whether XML file is a regular file
    3. Check read permission
    
    Args:
        xmlFile (str): Path to the XML file
        
    Raises:
        Exception: If any validation fails
    """
    if xmlFile.startswith('~/'):
        homePath = os.path.expanduser('~')
        xmlFile = homePath + xmlFile[1:]
    if not os.path.exists(xmlFile):
        CMLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50201"] % "xmlFile")
    if not os.path.isfile(xmlFile):
        CMLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50210"] % "xmlFile")
    if not os.access(xmlFile, os.R_OK):
        CMLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50100"] % (xmlFile, "current user"))

def checkHostsTrust(hosts):
    """
    Check SSH trust between current host and the given hosts.
    
    Args:
        hosts (list): List of hostnames to check trust with
        
    Raises:
        Exception: If any host lacks SSH trust
    """
    hostsWithoutTrust = []
    for host in hosts:
        checkTrustCmd = (
            "ssh -o ConnectTimeout=3 -o ConnectionAttempts=5 "
            "-o PasswordAuthentication=no -o StrictHostKeyChecking=no "
            f"{host} 'pwd > /dev/null'"
        )
        status, output = subprocess.getstatusoutput(checkTrustCmd)
        if status != 0:
            hostsWithoutTrust.append(host)
    if hostsWithoutTrust:
        CMLog.exitWithError(ErrorCode.GAUSS_511["GAUSS_51100"] % ','.join(hostsWithoutTrust))

def GetHostIpOrName():
    """
    function: Obtaining the local IP address
    input: NA
    output: NA
    """
    env_dist = os.environ
    if "HOST_IP" not in list(env_dist.keys()):
        return getHostName()
    host_ip = env_dist.get("HOST_IP")
    if host_ip is not None and isIpValid(host_ip):
        return host_ip
    try:
        # Obtain the address of the local host
        addr_info = socket.getaddrinfo(socket.gethostname(), None)
        for info in addr_info:
            # Extract IPv4 or IPv6 addresses from address information
            host_ip = info[ADDRESS_FAMILY_INDEX][IP_ADDRESS_INDEX]
    except Exception as e:
        raise e
    return host_ip

def getHostName():
    """
    function : Get host name
    input : NA
    output: string
    """
    host_cmd = findCmdInPath("hostname")
    (status, output) = subprocess.getstatusoutput(host_cmd)
    # if cmd failed, then exit
    if status != 0:
        raise Exception(ErrorCode.GAUSS_502["GAUSS_50219"] % "host name"
                        + "The cmd is %s" % host_cmd)
    return output


def isIpValid(ip_address):
    """
    function : check if the input ip address is valid
    input : String
    output : bool
    """
    try:
        ipaddress.ip_address(ip_address)
        return True
    except ValueError:
        return False

def get_ip_version(ip_address):
    try:
        ip = ipaddress.ip_address(ip_address)
        # If hostname is a valid IP address (both IPv4 and IPv6)
        if(ip.version == 4):
            return NET_IPV4
        if(ip.version == 6):
            return NET_IPV6
    except ValueError:
        # hostname may be a hostname or an unvalid ip
        return ""

def createFileInSafeMode(file_path, mode=stat.S_IWUSR | stat.S_IRUSR):
    """安全创建文件并确保权限为0o600"""
    try:
        # 创建父目录（如果不存在）
        os.makedirs(os.path.dirname(file_path), exist_ok=True)

        # 原子操作创建文件并设置权限
        fd = os.open(file_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, mode)
        os.close(fd)
    except FileExistsError:
        pass  # 文件已存在则跳过
    except PermissionError as e:
        raise Exception(f"No permission to operate path: {file_path}. Error:\n{str(e)}")
    except OSError as e:
        raise Exception(f"Failed to create file: {file_path}. Error:\n{str(e)}")


def removeDirectory(path):
    """
    function: remove the content in a directory
    input:path
    output:true
    """
    if "*" in path:
        path = withAsteriskPath(path)
        cmd = "%s %s" % (getRemoveCmd("directory"), path)
    else:
        cmd = "%s '%s'" % (getRemoveCmd("directory"), path)
    (status, output) = subprocess.getstatusoutput(cmd)
    if status != 0:
        raise Exception(ErrorCode.GAUSS_502["GAUSS_50209"] % path +
                        " Error:\n%s." % output + "The cmd is %s" % cmd)
    return True

def withAsteriskPath(path):
    """
    function: deal with the path with *
    input: the path to deal with
    output: cmd
    """
    path_dir_list = os.path.realpath(path).split(os.path.sep)[1:]
    path = "'"
    for dir_name in path_dir_list:
        if "*" in dir_name:
            dir_path = "'" + os.path.sep + dir_name + "'"
        else:
            dir_path = os.path.sep + dir_name
        path += dir_path
    if path[-1] == "'":
        path = path[:-1]
    else:
        path += "'"
    return path

def getRemoveCmd(path_type):
    """
    function: get remove cmd
    input  : path_type
    output : str
    """
    opts = " "
    if path_type == "file":
        opts = " -f "
    elif path_type == "directory":
        opts = " -rf "
    return findCmdInPath('rm') + opts

def findCmdInPath(cmd, additional_paths=None, print_error=True):
    """
    function: find cmd in path
    input: cmd, additional_paths, printError
    output: NA
    """
    global CMD_CACHE
    if additional_paths is None:
        additional_paths = []
    if cmd not in CMD_CACHE:
        # Search additional paths and don't add to cache.
        for p in additional_paths:
            f = os.path.join(p, cmd)
            if os.path.exists(f):
                return f

        for p in CMD_PATH:
            f = os.path.join(p, cmd)
            if os.path.exists(f):
                CMD_CACHE[cmd] = f
                return f

        if cmd == "killall":
            gphome = os.getenv("GPHOME")
            if gphome is None or \
                    not os.path.exists(os.path.join(gphome, "script/killall")):
                gphome = os.path.dirname(os.path.realpath(__file__)) \
                            + "/../../.."
            gphome = gphome.replace("\\", "\\\\").replace('"', '\\"\\"')
            #SecurityChecker.check_injection_char(gphome)
            if gphome != "" and os.path.exists(os.path.join(gphome,
                                                            "script/killall")):
                return os.path.join(gphome, "script/killall")
            else:
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % "killall")

        if print_error:
            print('Command %s not found' % cmd)
        search_path = CMD_PATH[:]
        search_path.extend(additional_paths)
        raise CommandNotFoundException(cmd, search_path)
    else:
        return CMD_CACHE[cmd]

def getEnvironmentParameterValue(environment_parameter_name, user, env_file=None):
    """
    function : Get the environment parameter value from user
    input : String,String
    output : String
    """
    if env_file is not None:
        user_profile = env_file
    else:
        user_profile = getMpprcFile()
    # buid the shell command
    # SecurityChecker.check_injection_char(environment_parameter_name)
    execute_cmd = "echo $%s" % environment_parameter_name
    cmd = getExecuteCmdWithUserProfile(user, user_profile, execute_cmd)
    (status, output) = subprocess.getstatusoutput(cmd)
    if status == 0:
        env_value = output.split("\n")[0]
        env_value = env_value.replace("\\", "\\\\").replace('"', '\\"\\"')
        # SecurityChecker.check_injection_char(env_value)
        return env_value
    return ""

def getMpprcFile():
    """
    function : get mpprc file
    input : NA
    output : String
    """
    # get mpp file by env parameter MPPDB_ENV_SEPARATE_PATH
    mpprc_file = getEnv("MPPDB_ENV_SEPARATE_PATH")
    if mpprc_file != "" and mpprc_file is not None:
        if not os.path.isabs(mpprc_file):
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51206"] % mpprc_file)
        if not os.path.exists(mpprc_file):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % mpprc_file)
    elif os.getuid() == 0:
        return "/etc/profile"
    else:
        user_absolute_home_path = getUserHomePath()
        mpprc_file = os.path.join(user_absolute_home_path, ".bashrc")
    if not os.path.isfile(mpprc_file):
        raise Exception(ErrorCode.GAUSS_502["GAUSS_50210"] % mpprc_file)
    return mpprc_file

def getUserHomePath():
    """
    Get home path of user
    """
    # converts the relative path to an absolute path
    cmd = "echo ~ 2>/dev/null"
    (status, output) = subprocess.getstatusoutput(cmd)
    if status != 0:
        raise Exception(ErrorCode.GAUSS_502["GAUSS_50219"] % "user home")
    return output

def getExecuteCmdWithUserProfile(user, user_profile, execute_cmd,
                                     ignore_error=True):
    """
    """
    if (user != "") and (os.getuid() == 0):
        cmd = "su - %s -c 'source %s; %s'" % (user, user_profile, execute_cmd)
    else:
        cmd = "source %s; %s" % (user_profile, execute_cmd)
    if ignore_error:
        cmd += " 2>/dev/null"
    return cmd

def getEnv(env_param, default_value=None):
    """
    function: get the filter environment variable
    input:envparam: String
            default_value: String
    output:envValue
    """
    env_value = os.getenv(env_param)

    if env_value is None:
        if default_value:
            return default_value
        else:
            return env_value

    env_value = env_value.replace("\\", "\\\\").replace('"', '\\"\\"')

    # SecurityChecker.check_injection_char(env_value)

    return env_value

def isdigit(c):
    return '0' <= c <= '9'

def islower(c):
    return 'a' <= c <= 'z'

def isupper(c):
    return 'A' <= c <= 'Z'

def json_to_xml(json_file):
    """将JSON配置文件转换为XML格式"""
    import json
    import tempfile
    import os
    
    try:
        with open(json_file, 'r', encoding='utf-8') as f:
            config = json.load(f)
    except Exception as e:
        print(f"Error reading JSON file: {e}")
        return None, None, None
    
    # 提取env_file和user信息
    env_file = config.get('cluster', {}).get('env_file', '')
    user = config.get('cluster', {}).get('user', '')
    
    # 构建节点列表字符串
    node_names = [node['name'] for node in config['cluster']['nodes']]
    back_ips = [node['ip'] for node in config['cluster']['nodes']]
    gr_nodes_list = []
    for i, node in enumerate(config['cluster']['nodes']):
        gr_nodes_list.append(f"{i}:{node['ip']}:{node['gr_port']}")
    
    # 创建临时XML文件
    xml_content = f'''<?xml version="1.0" encoding="UTF-8"?>
<ROOT>
    <CLUSTER>
        <PARAM name="clusterName" value="{config['cluster']['name']}"/>
        <PARAM name="nodeNames" value="{','.join(node_names)}"/>
        <PARAM name="installPath" value="{config['cluster']['install_path']}"/>
        <PARAM name="backIp1s" value="{','.join(back_ips)}"/>
        <PARAM name="gr_nodes_list" value="{','.join(gr_nodes_list)}"/>
        <PARAM name="wormPath" value="{config['cluster'].get('worm_path', '/tmp/')}"/>
    </CLUSTER>
    <DEVICELIST>'''
    
    # 为每个节点生成DEVICE配置
    for i, node in enumerate(config['cluster']['nodes']):
        xml_content += f'''
        <DEVICE sn="{node['name']}">
            <PARAM name="name" value="{node['name']}"/>
            <PARAM name="azName" value="AZ1"/>
            <PARAM name="azPriority" value="1"/>
            <PARAM name="backIp1" value="{node['ip']}"/>
            <PARAM name="sshIp1" value="{node['ip']}"/>
            <PARAM name="cmDir" value="{config['cluster']['install_path']}cm"/>
            <PARAM name="grIp1" value="{node['ip']}"/>
            <PARAM name="listen_addr" value="{node['ip']}:{node['gr_port'] + 10}"/>'''
        
        # 为第一个节点添加额外的CM参数
        if i == 0:
            other_nodes = [n['name'] for n in config['cluster']['nodes'][1:]]
            other_ips = [n['ip'] for n in config['cluster']['nodes']]
            
            # 生成dataNode1参数：根据节点数量动态生成
            # 格式：${install_path}/data/dn,${hostnames[1]},${install_path}/data/dn,${hostnames[2]},${install_path}/data/dn
            data_node_parts = []
            for j, node in enumerate(config['cluster']['nodes']):
                if j > 0:  # 从第二个节点开始添加节点名
                    data_node_parts.append(node['name'])
                data_node_parts.append(f"{config['cluster']['install_path']}data/dn")
            dataNode1_value = ','.join(data_node_parts)
            
            xml_content += f'''
            <PARAM name="cmsNum" value="1"/>
            <PARAM name="cmServerPortBase" value="{config['cluster']['nodes'][0]['gr_port'] + 20}"/>
            <PARAM name="cmServerListenIp1" value="{','.join(other_ips)}"/>
            <PARAM name="cmServerlevel" value="1"/>
            <PARAM name="cmServerRelation" value="{','.join([node['name']] + other_nodes)}"/>
            <PARAM name="dataNum" value="1"/>
            <PARAM name="dataPortBase" value="{config['cluster']['nodes'][0]['gr_port'] + 30}"/>
            <PARAM name="dataNode1" value="{dataNode1_value}"/>'''
        xml_content += '''
        </DEVICE>'''
    
    xml_content += '''
    </DEVICELIST>
</ROOT>'''
    
    # 创建临时文件
    temp_xml = tempfile.NamedTemporaryFile(mode='w', suffix='.xml', delete=False, encoding='utf-8')
    temp_xml.write(xml_content)
    temp_xml.close()
    
    return temp_xml.name, env_file, user

def main():
    clusterInfo = dbClusterInfo()
    
    # 检查是否有JSON配置文件参数
    json_file = None
    xml_file = None
    json_env_file = None
    json_user = None
    
    # 检查命令行参数中是否有JSON文件
    if len(sys.argv) > 1:
        # 检查是否是trust、uninstall、gr_certs或preinstall命令
        is_json_command = 'trust' in sys.argv or 'uninstall' in sys.argv or 'gr_certs' in sys.argv or 'preinstall' in sys.argv
        
        for i, arg in enumerate(sys.argv):
            if arg.endswith('.json'):
                json_file = arg
                # 如果是trust或uninstall命令，跳过XML转换
                if is_json_command:
                    # 仍然需要读取JSON中的env_file和user信息
                    import json
                    try:
                        with open(json_file, 'r', encoding='utf-8') as f:
                            config = json.load(f)
                            json_env_file = config.get('cluster', {}).get('env_file', '')
                            json_user = config.get('cluster', {}).get('user', '')
                    except Exception as e:
                        print(f"Error reading JSON file: {e}")
                    break
                # 将JSON文件转换为XML
                xml_file, json_env_file, json_user = json_to_xml(json_file)
                if xml_file:
                    print(f"Converted JSON config '{json_file}' to XML: {xml_file}")
                    # 替换命令行参数中的JSON文件为XML文件
                    sys.argv[i] = xml_file
                else:
                    print(f"Failed to convert JSON file: {json_file}")
                    sys.exit(1)
                break
    
    parser = argparse.ArgumentParser(description='oGRecorder OM Tool')
    subparsers = parser.add_subparsers(dest='command', required=True)

    # View command
    view_parser = subparsers.add_parser('view', help='View cluster information')
    view_parser.add_argument('xmlFile', help='XML configuration file (or JSON file)')

    # Generate-xml command
    gen_parser = subparsers.add_parser('generate-xml', help='Generate XML configuration')
    gen_parser.add_argument('xmlFile', help='XML configuration file (or JSON file)')

    # Install All command (GR + CM)
    install_all_parser = subparsers.add_parser('install', help='Install both GR and CM packages')
    install_all_parser.add_argument('-X', dest='xmlFile', required=True, help='XML configuration file (or JSON file)')
    install_all_parser.add_argument('--grpkg', dest='gr_package', required=True, help='Path to GR installation package')
    install_all_parser.add_argument('--cmpkg', dest='cm_package', required=True, help='Path to CM package')

    # GR uninstall command
    uninstall_parser = subparsers.add_parser('uninstall', help='Uninstall cluster')
    uninstall_parser.add_argument('-X', dest='config_file', required=True, help='JSON configuration file path')

    # GR certs command
    gr_certs_parser = subparsers.add_parser('gr_certs', help='Generate and distribute GR certs, then reload')
    gr_certs_parser.add_argument('-X', dest='config_file', required=True, help='JSON configuration file path')

    # GR preinstall command
    preinstall_parser = subparsers.add_parser('preinstall', help='Preinstall cluster')
    preinstall_parser.add_argument('-X', dest='config_file', required=True, help='JSON configuration file path')

    # SSH trust command
    trust_parser = subparsers.add_parser('trust', help='Automatically configure SSH mutual trust for cluster')
    trust_parser.add_argument('-X', dest='config_file', required=True, help='JSON configuration file path')

    args = parser.parse_args()    
    
    # 直接使用JSON中的值，并检查必要参数是否存在
    if args.command in ['install', 'gr_certs', 'preinstall']:
        if not json_env_file:
            print("Error: env_file not found in JSON configuration")
            sys.exit(1)
        args.envFile = json_env_file
        print(f"Using env file from JSON config: {json_env_file}")
    
    if args.command == 'preinstall':
        if not json_user:
            print("Error: user not found in JSON configuration")
            sys.exit(1)
        args.user = json_user
        print(f"Using user from JSON config: {json_user}")
    
    # Dispatch to appropriate handler based on command
    try:
        if args.command == 'view':
            clusterInfo.initFromXml(args.xmlFile)
            file_path = "%s/bin/cluster_static_config" % (clusterInfo.appPath)
            clusterInfo.printStaticConfig(args.xmlFile, file_path)
        elif args.command == 'generate-xml':
            clusterInfo.initFromXml(args.xmlFile)
            clusterInfo.doRebuildConf(args.xmlFile)
        elif args.command == 'install':
            clusterInfo.gr_install(args.xmlFile, args.gr_package)
            install = Install()
            install.xmlFile = args.xmlFile
            install.envFile = args.envFile
            install.cmpkg = args.cm_package
            install.run()
        elif args.command == 'preinstall':
            clusterInfo.preinstall(args.config_file, args.user, args.envFile)
        elif args.command == 'uninstall':
            clusterInfo.uninstall(args.config_file)
        elif args.command == 'gr_certs':
            clusterInfo.distribute_gr_certs(args.config_file, args.envFile)
        elif args.command == 'trust':
            clusterInfo.setup_ssh_trust(args.config_file)
        else:
            print("Invalid command")
            sys.exit(1)
    finally:
        # 清理临时文件
        if xml_file and os.path.exists(xml_file):
            try:
                os.unlink(xml_file)
                print(f"Cleaned up temporary XML file: {xml_file}")
            except:
                pass

if __name__ == "__main__":
    main()